1use crate::config::{ClusterConfig, NodeConfig};
6use crate::errors::{PolarisError, PolarisResult};
7use crate::executor::Executor;
8use crate::node::{Node, NodeRegistry, NodeStatus};
9use crate::observability::MetricsCollector;
10use crate::scheduler::{RoundRobinScheduler, Scheduler};
11use crate::task::{Task, TaskHandle, TaskId, TaskResult, TaskStatus};
12use std::sync::Arc;
13use std::time::Duration;
14
15#[derive(Clone)]
17pub struct Cluster {
18 config: Arc<ClusterConfig>,
19 registry: NodeRegistry,
20 scheduler: Arc<dyn Scheduler>,
21 executor: Executor,
22 metrics: MetricsCollector,
23 tasks: Arc<dashmap::DashMap<TaskId, Task>>,
24 local_mode: bool,
25}
26
27impl Cluster {
28 pub fn builder() -> ClusterBuilder {
30 ClusterBuilder::default()
31 }
32
33 pub async fn connect<I, S>(seeds: I) -> PolarisResult<Self>
45 where
46 I: IntoIterator<Item = S>,
47 S: AsRef<str>,
48 {
49 let seed_list: Vec<String> = seeds.into_iter().map(|s| s.as_ref().to_string()).collect();
50
51 if seed_list.is_empty() {
52 return Err(PolarisError::InvalidConfig("No seed nodes provided".into()));
53 }
54
55 let config = ClusterConfig {
56 seed_nodes: seed_list.clone(),
57 ..Default::default()
58 };
59
60 let builder = ClusterBuilder::new().with_config(config);
61
62 for seed in &seed_list {
64 tracing::info!(seed = %seed, "Connecting to seed node");
65 }
67
68 builder.build().await
69 }
70
71 pub async fn submit(&self, mut task: Task) -> PolarisResult<TaskHandle> {
86 tracing::info!(task_id = %task.id, task_name = %task.name, "Submitting task");
87
88 self.metrics.record_task_submitted();
90
91 let nodes = self.registry.available();
93 if nodes.is_empty() {
94 return Err(PolarisError::scheduling_failed("No available nodes"));
95 }
96
97 let node_id = self.scheduler.schedule(&task, &nodes).await?;
99
100 task.transition_to(TaskStatus::Scheduled)?;
102 task.metadata.node_id = Some(node_id.to_string());
103 task.metadata.scheduled_at = Some(chrono::Utc::now());
104
105 self.tasks.insert(task.id, task.clone());
107
108 let handle = TaskHandle::new(task.id);
110 handle.update_status(TaskStatus::Scheduled);
111
112 if self.local_mode {
114 self.execute_task_local(task, handle.clone()).await?;
115 }
116
117 Ok(handle)
118 }
119
120 async fn execute_task_local(&self, task: Task, handle: TaskHandle) -> PolarisResult<()> {
122 self.executor.submit(task, handle)
123 }
124
125 pub async fn submit_batch(&self, tasks: Vec<Task>) -> PolarisResult<Vec<TaskHandle>> {
142 let mut handles = Vec::with_capacity(tasks.len());
143
144 for task in tasks {
145 let handle = self.submit(task).await?;
146 handles.push(handle);
147 }
148
149 Ok(handles)
150 }
151
152 pub fn map<T, I: IntoIterator<Item = Task>>(&self, tasks: I) -> MapBuilder<T> {
175 MapBuilder::new(self.clone(), tasks.into_iter().collect())
176 }
177
178 pub fn task_status(&self, task_id: TaskId) -> Option<TaskStatus> {
180 self.tasks.get(&task_id).map(|t| t.status)
181 }
182
183 pub fn stats(&self) -> ClusterStats {
185 let nodes = self.registry.all();
186 let total_nodes = nodes.len();
187 let available_nodes = nodes.iter().filter(|n| n.is_available()).count();
188
189 let total_tasks = self.tasks.len();
190 let running_tasks = self
191 .tasks
192 .iter()
193 .filter(|t| t.status == TaskStatus::Running)
194 .count();
195
196 ClusterStats {
197 total_nodes,
198 available_nodes,
199 total_tasks,
200 running_tasks,
201 }
202 }
203
204 pub async fn shutdown(&self) -> PolarisResult<()> {
206 tracing::info!("Shutting down cluster");
207
208 for node in self.registry.all() {
210 node.set_status(NodeStatus::Draining);
211 }
212
213 let timeout = Duration::from_secs(30);
215 let start = std::time::Instant::now();
216
217 while start.elapsed() < timeout {
218 let running = self
219 .tasks
220 .iter()
221 .filter(|t| t.status == TaskStatus::Running)
222 .count();
223
224 if running == 0 {
225 break;
226 }
227
228 tokio::time::sleep(Duration::from_millis(100)).await;
229 }
230
231 tracing::info!("Cluster shutdown complete");
232 Ok(())
233 }
234}
235
236impl std::fmt::Debug for Cluster {
237 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238 f.debug_struct("Cluster")
239 .field("config", &self.config)
240 .field("node_count", &self.registry.count())
241 .field("task_count", &self.tasks.len())
242 .finish()
243 }
244}
245
246#[derive(Default)]
248pub struct ClusterBuilder {
249 config: Option<ClusterConfig>,
250 scheduler: Option<Arc<dyn Scheduler>>,
251 local_mode: bool,
252}
253
254impl ClusterBuilder {
255 pub fn new() -> Self {
257 Self::default()
258 }
259
260 pub fn with_config(mut self, config: ClusterConfig) -> Self {
262 self.config = Some(config);
263 self
264 }
265
266 pub fn with_scheduler<S: Scheduler + 'static>(mut self, scheduler: S) -> Self {
268 self.scheduler = Some(Arc::new(scheduler));
269 self
270 }
271
272 pub fn with_local_node(mut self) -> Self {
274 self.local_mode = true;
275 self
276 }
277
278 pub async fn build(self) -> PolarisResult<Cluster> {
280 let config = self.config.unwrap_or_default();
281 config.validate()?;
282
283 let registry = NodeRegistry::new();
284
285 if self.local_mode {
287 let node_config = NodeConfig::default();
288 let node = Node::new("local-node", node_config.bind_addr);
289 node.set_status(NodeStatus::Ready);
290 registry.register(node)?;
291 tracing::info!("Started cluster in local mode");
292 }
293
294 let scheduler = self
295 .scheduler
296 .unwrap_or_else(|| Arc::new(RoundRobinScheduler::new()));
297
298 Ok(Cluster {
299 config: Arc::new(config),
300 registry,
301 scheduler,
302 executor: Executor::new(),
303 metrics: MetricsCollector::new(),
304 tasks: Arc::new(dashmap::DashMap::new()),
305 local_mode: self.local_mode,
306 })
307 }
308}
309
310#[derive(Debug, Clone)]
312pub struct ClusterStats {
313 pub total_nodes: usize,
315 pub available_nodes: usize,
317 pub total_tasks: usize,
319 pub running_tasks: usize,
321}
322
323pub struct MapBuilder<T> {
327 cluster: Cluster,
328 tasks: Vec<Task>,
329 retry_count: u32,
330 _phantom: std::marker::PhantomData<T>,
331}
332
333impl<T> MapBuilder<T> {
334 fn new(cluster: Cluster, tasks: Vec<Task>) -> Self {
335 Self {
336 cluster,
337 tasks,
338 retry_count: 3,
339 _phantom: std::marker::PhantomData,
340 }
341 }
342
343 pub fn with_retry(mut self, count: u32) -> Self {
347 self.retry_count = count;
348 self
349 }
350
351 pub async fn await_impl(self) -> PolarisResult<Vec<TaskResult>> {
360 use std::time::Duration;
361
362 if self.tasks.is_empty() {
363 return Ok(Vec::new());
364 }
365
366 tracing::info!(
367 task_count = self.tasks.len(),
368 retry_count = self.retry_count,
369 "Starting map operation"
370 );
371
372 let mut results = Vec::with_capacity(self.tasks.len());
373 let mut handles = Vec::with_capacity(self.tasks.len());
374
375 for mut task in self.tasks {
377 task.max_retries = self.retry_count;
379
380 let handle = self.cluster.submit(task).await?;
381 handles.push(handle);
382 }
383
384 tracing::debug!("All tasks submitted, waiting for results");
385
386 for (idx, handle) in handles.into_iter().enumerate() {
388 let result = handle.result().await?;
390
391 if result.success {
392 tracing::debug!(task_idx = idx, "Task succeeded");
393 } else {
394 tracing::warn!(
395 task_idx = idx,
396 error = ?result.error,
397 "Task failed after retries"
398 );
399 }
400
401 results.push(result);
402 }
403
404 let success_count = results.iter().filter(|r| r.success).count();
405 let failure_count = results.len() - success_count;
406
407 tracing::info!(
408 total = results.len(),
409 succeeded = success_count,
410 failed = failure_count,
411 "Map operation completed"
412 );
413
414 Ok(results)
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::scheduler::LoadAwareScheduler;
422 use bytes::Bytes;
423
424 #[tokio::test]
425 async fn test_cluster_builder_local() {
426 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
427
428 assert_eq!(cluster.registry.count(), 1);
429 assert!(cluster.local_mode);
430 }
431
432 #[tokio::test]
433 async fn test_cluster_submit_task() {
434 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
435
436 let task = Task::new("test_task", Bytes::from("payload"));
437 let handle = cluster.submit(task).await.unwrap();
438
439 assert!(!handle.id.as_uuid().is_nil());
440 }
441
442 #[tokio::test]
443 async fn test_cluster_map_operation() {
444 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
445
446 let tasks = vec![
448 Task::new("map_task_1", Bytes::from("input1")),
449 Task::new("map_task_2", Bytes::from("input2")),
450 Task::new("map_task_3", Bytes::from("input3")),
451 ];
452
453 let results = cluster.map::<String, _>(tasks)
455 .with_retry(2)
456 .await_impl()
457 .await
458 .unwrap();
459
460 assert_eq!(results.len(), 3);
462 assert!(results.iter().all(|r| r.success));
463 }
464
465 #[tokio::test]
466 async fn test_cluster_submit_batch() {
467 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
468
469 let tasks = vec![
470 Task::new("task1", Bytes::new()),
471 Task::new("task2", Bytes::new()),
472 Task::new("task3", Bytes::new()),
473 ];
474
475 let handles = cluster.submit_batch(tasks).await.unwrap();
476 assert_eq!(handles.len(), 3);
477 }
478
479 #[tokio::test]
480 async fn test_cluster_stats() {
481 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
482
483 let stats = cluster.stats();
484 assert_eq!(stats.total_nodes, 1);
485 assert_eq!(stats.available_nodes, 1);
486 }
487
488 #[tokio::test]
489 async fn test_cluster_custom_scheduler() {
490 let cluster = Cluster::builder()
491 .with_local_node()
492 .with_scheduler(LoadAwareScheduler::new())
493 .build()
494 .await
495 .unwrap();
496
497 let task = Task::new("test", Bytes::new());
498 let result = cluster.submit(task).await;
499 assert!(result.is_ok());
500 }
501
502 #[tokio::test]
503 async fn test_cluster_no_nodes_error() {
504 let cluster = Cluster::builder().build().await.unwrap();
505
506 let task = Task::new("test", Bytes::new());
507 let result = cluster.submit(task).await;
508 assert!(result.is_err());
509 }
510
511 #[tokio::test]
512 async fn test_cluster_connect_validation() {
513 let result = Cluster::connect::<_, &str>([]).await;
514 assert!(result.is_err());
515 }
516
517 #[tokio::test]
518 async fn test_cluster_shutdown() {
519 let cluster = Cluster::builder().with_local_node().build().await.unwrap();
520
521 let result = cluster.shutdown().await;
522 assert!(result.is_ok());
523 }
524}