eshanized_polaris_core/
cluster.rs

1//! Cluster management and orchestration.
2//!
3//! This module provides the main cluster API for submitting and managing tasks.
4
5use crate::config::{ClusterConfig, NodeConfig};
6use crate::errors::{PolarisError, PolarisResult};
7use crate::executor::Executor;
8use crate::node::{Node, NodeRegistry, NodeStatus};
9use crate::observability::MetricsCollector;
10use crate::scheduler::{RoundRobinScheduler, Scheduler};
11use crate::task::{Task, TaskHandle, TaskId, TaskResult, TaskStatus};
12use std::sync::Arc;
13use std::time::Duration;
14
15/// Main cluster handle
16#[derive(Clone)]
17pub struct Cluster {
18    config: Arc<ClusterConfig>,
19    registry: NodeRegistry,
20    scheduler: Arc<dyn Scheduler>,
21    executor: Executor,
22    metrics: MetricsCollector,
23    tasks: Arc<dashmap::DashMap<TaskId, Task>>,
24    local_mode: bool,
25}
26
27impl Cluster {
28    /// Create a new cluster builder
29    pub fn builder() -> ClusterBuilder {
30        ClusterBuilder::default()
31    }
32
33    /// Connect to an existing cluster
34    ///
35    /// # Example
36    ///
37    /// ```no_run
38    /// # use polaris_core::prelude::*;
39    /// # async fn example() -> PolarisResult<()> {
40    /// let cluster = Cluster::connect(["10.0.0.1:7001", "10.0.0.2:7001"]).await?;
41    /// # Ok(())
42    /// # }
43    /// ```
44    pub async fn connect<I, S>(seeds: I) -> PolarisResult<Self>
45    where
46        I: IntoIterator<Item = S>,
47        S: AsRef<str>,
48    {
49        let seed_list: Vec<String> = seeds.into_iter().map(|s| s.as_ref().to_string()).collect();
50
51        if seed_list.is_empty() {
52            return Err(PolarisError::InvalidConfig("No seed nodes provided".into()));
53        }
54
55        let config = ClusterConfig {
56            seed_nodes: seed_list.clone(),
57            ..Default::default()
58        };
59
60        let builder = ClusterBuilder::new().with_config(config);
61
62        // Connect to seed nodes
63        for seed in &seed_list {
64            tracing::info!(seed = %seed, "Connecting to seed node");
65            // In a real implementation, this would establish network connections
66        }
67
68        builder.build().await
69    }
70
71    /// Submit a task to the cluster
72    ///
73    /// # Example
74    ///
75    /// ```no_run
76    /// # use polaris_core::prelude::*;
77    /// # use bytes::Bytes;
78    /// # async fn example(cluster: Cluster) -> PolarisResult<()> {
79    /// let task = Task::new("my_task", Bytes::from("payload"));
80    /// let handle = cluster.submit(task).await?;
81    /// let result = handle.result().await?;
82    /// # Ok(())
83    /// # }
84    /// ```
85    pub async fn submit(&self, mut task: Task) -> PolarisResult<TaskHandle> {
86        tracing::info!(task_id = %task.id, task_name = %task.name, "Submitting task");
87        
88        // Record metrics
89        self.metrics.record_task_submitted();
90
91        // Get available nodes
92        let nodes = self.registry.available();
93        if nodes.is_empty() {
94            return Err(PolarisError::scheduling_failed("No available nodes"));
95        }
96
97        // Schedule the task
98        let node_id = self.scheduler.schedule(&task, &nodes).await?;
99
100        // Update task status
101        task.transition_to(TaskStatus::Scheduled)?;
102        task.metadata.node_id = Some(node_id.to_string());
103        task.metadata.scheduled_at = Some(chrono::Utc::now());
104
105        // Store task
106        self.tasks.insert(task.id, task.clone());
107
108        // Create handle
109        let handle = TaskHandle::new(task.id);
110        handle.update_status(TaskStatus::Scheduled);
111
112        // In local mode, execute immediately
113        if self.local_mode {
114            self.execute_task_local(task, handle.clone()).await?;
115        }
116
117        Ok(handle)
118    }
119
120    /// Execute a task locally (for local mode or testing)
121    async fn execute_task_local(&self, task: Task, handle: TaskHandle) -> PolarisResult<()> {
122        self.executor.submit(task, handle)
123    }
124
125    /// Submit multiple tasks and collect results
126    ///
127    /// # Example
128    ///
129    /// ```no_run
130    /// # use polaris_core::prelude::*;
131    /// # use bytes::Bytes;
132    /// # async fn example(cluster: Cluster) -> PolarisResult<()> {
133    /// let tasks = vec![
134    ///     Task::new("task1", Bytes::from("input1")),
135    ///     Task::new("task2", Bytes::from("input2")),
136    /// ];
137    /// let results = cluster.submit_batch(tasks).await?;
138    /// # Ok(())
139    /// # }
140    /// ```
141    pub async fn submit_batch(&self, tasks: Vec<Task>) -> PolarisResult<Vec<TaskHandle>> {
142        let mut handles = Vec::with_capacity(tasks.len());
143
144        for task in tasks {
145            let handle = self.submit(task).await?;
146            handles.push(handle);
147        }
148
149        Ok(handles)
150    }
151
152    /// Map over a list of tasks
153    /// 
154    /// Submit multiple tasks and collect their results in parallel.
155    /// 
156    /// # Example
157    /// 
158    /// ```no_run
159    /// # use polaris_core::prelude::*;
160    /// # use bytes::Bytes;
161    /// # async fn example(cluster: Cluster) -> PolarisResult<()> {
162    /// let tasks = vec![
163    ///     Task::new("task1", Bytes::from("input1")),
164    ///     Task::new("task2", Bytes::from("input2")),
165    /// ];
166    /// 
167    /// let results = cluster.map(tasks)
168    ///     .with_retry(5)
169    ///     .await_impl()
170    ///     .await?;
171    /// # Ok(())
172    /// # }
173    /// ```
174    pub fn map<T, I: IntoIterator<Item = Task>>(&self, tasks: I) -> MapBuilder<T> {
175        MapBuilder::new(self.clone(), tasks.into_iter().collect())
176    }
177
178    /// Get task status
179    pub fn task_status(&self, task_id: TaskId) -> Option<TaskStatus> {
180        self.tasks.get(&task_id).map(|t| t.status)
181    }
182
183    /// Get cluster statistics
184    pub fn stats(&self) -> ClusterStats {
185        let nodes = self.registry.all();
186        let total_nodes = nodes.len();
187        let available_nodes = nodes.iter().filter(|n| n.is_available()).count();
188
189        let total_tasks = self.tasks.len();
190        let running_tasks = self
191            .tasks
192            .iter()
193            .filter(|t| t.status == TaskStatus::Running)
194            .count();
195
196        ClusterStats {
197            total_nodes,
198            available_nodes,
199            total_tasks,
200            running_tasks,
201        }
202    }
203
204    /// Shutdown the cluster gracefully
205    pub async fn shutdown(&self) -> PolarisResult<()> {
206        tracing::info!("Shutting down cluster");
207
208        // Mark all nodes as draining
209        for node in self.registry.all() {
210            node.set_status(NodeStatus::Draining);
211        }
212
213        // Wait for running tasks to complete (with timeout)
214        let timeout = Duration::from_secs(30);
215        let start = std::time::Instant::now();
216
217        while start.elapsed() < timeout {
218            let running = self
219                .tasks
220                .iter()
221                .filter(|t| t.status == TaskStatus::Running)
222                .count();
223
224            if running == 0 {
225                break;
226            }
227
228            tokio::time::sleep(Duration::from_millis(100)).await;
229        }
230
231        tracing::info!("Cluster shutdown complete");
232        Ok(())
233    }
234}
235
236impl std::fmt::Debug for Cluster {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.debug_struct("Cluster")
239            .field("config", &self.config)
240            .field("node_count", &self.registry.count())
241            .field("task_count", &self.tasks.len())
242            .finish()
243    }
244}
245
246/// Builder for creating a cluster
247#[derive(Default)]
248pub struct ClusterBuilder {
249    config: Option<ClusterConfig>,
250    scheduler: Option<Arc<dyn Scheduler>>,
251    local_mode: bool,
252}
253
254impl ClusterBuilder {
255    /// Create a new cluster builder
256    pub fn new() -> Self {
257        Self::default()
258    }
259
260    /// Set cluster configuration
261    pub fn with_config(mut self, config: ClusterConfig) -> Self {
262        self.config = Some(config);
263        self
264    }
265
266    /// Set custom scheduler
267    pub fn with_scheduler<S: Scheduler + 'static>(mut self, scheduler: S) -> Self {
268        self.scheduler = Some(Arc::new(scheduler));
269        self
270    }
271
272    /// Enable local mode (single node, in-process execution)
273    pub fn with_local_node(mut self) -> Self {
274        self.local_mode = true;
275        self
276    }
277
278    /// Build the cluster
279    pub async fn build(self) -> PolarisResult<Cluster> {
280        let config = self.config.unwrap_or_default();
281        config.validate()?;
282
283        let registry = NodeRegistry::new();
284
285        // Create local node if in local mode
286        if self.local_mode {
287            let node_config = NodeConfig::default();
288            let node = Node::new("local-node", node_config.bind_addr);
289            node.set_status(NodeStatus::Ready);
290            registry.register(node)?;
291            tracing::info!("Started cluster in local mode");
292        }
293
294        let scheduler = self
295            .scheduler
296            .unwrap_or_else(|| Arc::new(RoundRobinScheduler::new()));
297
298        Ok(Cluster {
299            config: Arc::new(config),
300            registry,
301            scheduler,
302            executor: Executor::new(),
303            metrics: MetricsCollector::new(),
304            tasks: Arc::new(dashmap::DashMap::new()),
305            local_mode: self.local_mode,
306        })
307    }
308}
309
310/// Cluster statistics
311#[derive(Debug, Clone)]
312pub struct ClusterStats {
313    /// Total number of nodes
314    pub total_nodes: usize,
315    /// Number of available nodes
316    pub available_nodes: usize,
317    /// Total tasks submitted
318    pub total_tasks: usize,
319    /// Currently running tasks
320    pub running_tasks: usize,
321}
322
323/// Builder for map operations
324/// 
325/// Allows batching multiple tasks with retry configuration.
326pub struct MapBuilder<T> {
327    cluster: Cluster,
328    tasks: Vec<Task>,
329    retry_count: u32,
330    _phantom: std::marker::PhantomData<T>,
331}
332
333impl<T> MapBuilder<T> {
334    fn new(cluster: Cluster, tasks: Vec<Task>) -> Self {
335        Self {
336            cluster,
337            tasks,
338            retry_count: 3,
339            _phantom: std::marker::PhantomData,
340        }
341    }
342
343    /// Set retry count for failed tasks
344    /// 
345    /// Each task will be retried up to this many times if it fails.
346    pub fn with_retry(mut self, count: u32) -> Self {
347        self.retry_count = count;
348        self
349    }
350
351    /// Execute the map operation
352    /// 
353    /// Submits all tasks concurrently, waits for results, and retries failures.
354    /// 
355    /// # Returns
356    /// 
357    /// Returns task results in the order they were submitted.
358    /// Note: Type parameter T is currently not used for deserialization.
359    pub async fn await_impl(self) -> PolarisResult<Vec<TaskResult>> {
360        use std::time::Duration;
361        
362        if self.tasks.is_empty() {
363            return Ok(Vec::new());
364        }
365
366        tracing::info!(
367            task_count = self.tasks.len(),
368            retry_count = self.retry_count,
369            "Starting map operation"
370        );
371
372        let mut results = Vec::with_capacity(self.tasks.len());
373        let mut handles = Vec::with_capacity(self.tasks.len());
374
375        // Submit all tasks
376        for mut task in self.tasks {
377            // Configure retry on the task itself
378            task.max_retries = self.retry_count;
379            
380            let handle = self.cluster.submit(task).await?;
381            handles.push(handle);
382        }
383
384        tracing::debug!("All tasks submitted, waiting for results");
385
386        // Collect results with timeout
387        for (idx, handle) in handles.into_iter().enumerate() {
388            // Wait for task completion
389            let result = handle.result().await?;
390            
391            if result.success {
392                tracing::debug!(task_idx = idx, "Task succeeded");
393            } else {
394                tracing::warn!(
395                    task_idx = idx,
396                    error = ?result.error,
397                    "Task failed after retries"
398                );
399            }
400            
401            results.push(result);
402        }
403
404        let success_count = results.iter().filter(|r| r.success).count();
405        let failure_count = results.len() - success_count;
406
407        tracing::info!(
408            total = results.len(),
409            succeeded = success_count,
410            failed = failure_count,
411            "Map operation completed"
412        );
413
414        Ok(results)
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use crate::scheduler::LoadAwareScheduler;
422    use bytes::Bytes;
423
424    #[tokio::test]
425    async fn test_cluster_builder_local() {
426        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
427
428        assert_eq!(cluster.registry.count(), 1);
429        assert!(cluster.local_mode);
430    }
431
432    #[tokio::test]
433    async fn test_cluster_submit_task() {
434        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
435
436        let task = Task::new("test_task", Bytes::from("payload"));
437        let handle = cluster.submit(task).await.unwrap();
438
439        assert!(!handle.id.as_uuid().is_nil());
440    }
441
442    #[tokio::test]
443    async fn test_cluster_map_operation() {
444        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
445
446        // Create multiple tasks
447        let tasks = vec![
448            Task::new("map_task_1", Bytes::from("input1")),
449            Task::new("map_task_2", Bytes::from("input2")),
450            Task::new("map_task_3", Bytes::from("input3")),
451        ];
452
453        // Execute map operation with retry
454        let results = cluster.map::<String, _>(tasks)
455            .with_retry(2)
456            .await_impl()
457            .await
458            .unwrap();
459
460        // Verify results
461        assert_eq!(results.len(), 3);
462        assert!(results.iter().all(|r| r.success));
463    }
464
465    #[tokio::test]
466    async fn test_cluster_submit_batch() {
467        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
468
469        let tasks = vec![
470            Task::new("task1", Bytes::new()),
471            Task::new("task2", Bytes::new()),
472            Task::new("task3", Bytes::new()),
473        ];
474
475        let handles = cluster.submit_batch(tasks).await.unwrap();
476        assert_eq!(handles.len(), 3);
477    }
478
479    #[tokio::test]
480    async fn test_cluster_stats() {
481        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
482
483        let stats = cluster.stats();
484        assert_eq!(stats.total_nodes, 1);
485        assert_eq!(stats.available_nodes, 1);
486    }
487
488    #[tokio::test]
489    async fn test_cluster_custom_scheduler() {
490        let cluster = Cluster::builder()
491            .with_local_node()
492            .with_scheduler(LoadAwareScheduler::new())
493            .build()
494            .await
495            .unwrap();
496
497        let task = Task::new("test", Bytes::new());
498        let result = cluster.submit(task).await;
499        assert!(result.is_ok());
500    }
501
502    #[tokio::test]
503    async fn test_cluster_no_nodes_error() {
504        let cluster = Cluster::builder().build().await.unwrap();
505
506        let task = Task::new("test", Bytes::new());
507        let result = cluster.submit(task).await;
508        assert!(result.is_err());
509    }
510
511    #[tokio::test]
512    async fn test_cluster_connect_validation() {
513        let result = Cluster::connect::<_, &str>([]).await;
514        assert!(result.is_err());
515    }
516
517    #[tokio::test]
518    async fn test_cluster_shutdown() {
519        let cluster = Cluster::builder().with_local_node().build().await.unwrap();
520
521        let result = cluster.shutdown().await;
522        assert!(result.is_ok());
523    }
524}