1use std::collections::{BTreeMap, HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use tokio::sync::mpsc;
22use tracing::{debug, info};
23
24#[derive(Error, Debug)]
28pub enum DistributedStreamError {
29 #[error("Node not found: {node_id}")]
30 NodeNotFound { node_id: String },
31
32 #[error("No nodes available in topology")]
33 NoNodesAvailable,
34
35 #[error("Partition assignment failed: {reason}")]
36 PartitionAssignmentFailed { reason: String },
37
38 #[error("Window aggregation error: {0}")]
39 WindowAggregation(String),
40
41 #[error("Job distribution error: {0}")]
42 JobDistribution(String),
43
44 #[error("Channel send error: {0}")]
45 ChannelSend(String),
46
47 #[error("Topology inconsistency: {0}")]
48 TopologyInconsistency(String),
49}
50
51pub type DistributedResult<T> = Result<T, DistributedStreamError>;
53
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub enum NodeStatus {
59 Healthy,
61 Degraded,
63 Unreachable,
65 Draining,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StreamNode {
72 pub node_id: String,
74 pub address: String,
76 pub status: NodeStatus,
78 pub assigned_partitions: usize,
80 pub capacity: f64,
82 pub last_heartbeat: std::time::SystemTime,
84 pub weight: u32,
86}
87
88impl StreamNode {
89 pub fn new(node_id: impl Into<String>, address: impl Into<String>) -> Self {
91 Self {
92 node_id: node_id.into(),
93 address: address.into(),
94 status: NodeStatus::Healthy,
95 assigned_partitions: 0,
96 capacity: 1.0,
97 last_heartbeat: std::time::SystemTime::now(),
98 weight: 100,
99 }
100 }
101
102 pub fn is_available(&self) -> bool {
104 matches!(self.status, NodeStatus::Healthy | NodeStatus::Degraded) && self.capacity > 0.0
105 }
106}
107
108#[derive(Debug, Clone)]
112struct VirtualNode {
113 hash: u64,
115 node_id: String,
117}
118
119pub struct ConsistentHashRouter {
125 ring: Vec<VirtualNode>,
127 nodes: HashMap<String, StreamNode>,
129 virtual_nodes_per_node: usize,
131}
132
133impl ConsistentHashRouter {
134 pub fn new(virtual_nodes_per_node: usize) -> Self {
136 Self {
137 ring: Vec::new(),
138 nodes: HashMap::new(),
139 virtual_nodes_per_node,
140 }
141 }
142
143 pub fn add_node(&mut self, node: StreamNode) {
145 let node_id = node.node_id.clone();
146 for i in 0..self.virtual_nodes_per_node {
147 let key = format!("{}#{}", node_id, i);
148 let hash = self.fnv1a_hash(key.as_bytes());
149 self.ring.push(VirtualNode {
150 hash,
151 node_id: node_id.clone(),
152 });
153 }
154 self.ring.sort_by_key(|v| v.hash);
155 self.nodes.insert(node_id, node);
156 debug!(
157 "Added node to ring, total virtual nodes: {}",
158 self.ring.len()
159 );
160 }
161
162 pub fn remove_node(&mut self, node_id: &str) -> DistributedResult<()> {
164 if !self.nodes.contains_key(node_id) {
165 return Err(DistributedStreamError::NodeNotFound {
166 node_id: node_id.to_string(),
167 });
168 }
169 self.ring.retain(|v| v.node_id != node_id);
170 self.nodes.remove(node_id);
171 info!("Removed node {} from ring", node_id);
172 Ok(())
173 }
174
175 pub fn route(&self, partition_key: &str) -> DistributedResult<&StreamNode> {
177 if self.ring.is_empty() {
178 return Err(DistributedStreamError::NoNodesAvailable);
179 }
180 let hash = self.fnv1a_hash(partition_key.as_bytes());
181 let start_idx = self.find_ring_position(hash);
182 for offset in 0..self.ring.len() {
184 let candidate_id = &self.ring[(start_idx + offset) % self.ring.len()].node_id;
185 if let Some(node) = self.nodes.get(candidate_id) {
186 if node.is_available() {
187 return Ok(node);
188 }
189 }
190 }
191 let fallback_id = &self.ring[start_idx].node_id;
193 self.nodes
194 .get(fallback_id)
195 .ok_or_else(|| DistributedStreamError::NodeNotFound {
196 node_id: fallback_id.clone(),
197 })
198 }
199
200 pub fn nodes(&self) -> impl Iterator<Item = &StreamNode> {
202 self.nodes.values()
203 }
204
205 pub fn node_count(&self) -> usize {
207 self.nodes.len()
208 }
209
210 fn fnv1a_hash(&self, data: &[u8]) -> u64 {
212 const FNV_OFFSET: u64 = 14695981039346656037;
213 const FNV_PRIME: u64 = 1099511628211;
214 let mut hash = FNV_OFFSET;
215 for byte in data {
216 hash ^= u64::from(*byte);
217 hash = hash.wrapping_mul(FNV_PRIME);
218 }
219 hash
220 }
221
222 fn find_ring_position(&self, hash: u64) -> usize {
224 match self.ring.binary_search_by_key(&hash, |v| v.hash) {
225 Ok(idx) => idx,
226 Err(idx) => idx % self.ring.len(),
227 }
228 }
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct PartialWindowResult {
236 pub window_id: u64,
238 pub source_node: String,
240 pub event_count: u64,
242 pub partial_sum: f64,
244 pub partial_min: f64,
246 pub partial_max: f64,
248 pub is_complete: bool,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct AggregatedWindowResult {
255 pub window_id: u64,
257 pub total_events: u64,
259 pub global_sum: f64,
261 pub global_min: f64,
263 pub global_max: f64,
265 pub global_avg: f64,
267 pub contributing_nodes: usize,
269 pub is_complete: bool,
271}
272
273pub struct DistributedWindowAggregator {
278 expected_nodes: HashSet<String>,
280 partial_results: Arc<RwLock<HashMap<(u64, String), PartialWindowResult>>>,
282 completed: Arc<RwLock<BTreeMap<u64, AggregatedWindowResult>>>,
284 timeout: Duration,
286}
287
288impl DistributedWindowAggregator {
289 pub fn new(expected_nodes: HashSet<String>, timeout: Duration) -> Self {
291 Self {
292 expected_nodes,
293 partial_results: Arc::new(RwLock::new(HashMap::new())),
294 completed: Arc::new(RwLock::new(BTreeMap::new())),
295 timeout,
296 }
297 }
298
299 pub fn submit_partial(
304 &self,
305 partial: PartialWindowResult,
306 ) -> DistributedResult<Option<AggregatedWindowResult>> {
307 let window_id = partial.window_id;
308 {
309 let mut results = self.partial_results.write();
310 results.insert((window_id, partial.source_node.clone()), partial);
311 }
312 self.try_aggregate(window_id)
313 }
314
315 pub fn force_aggregate(&self, window_id: u64) -> DistributedResult<AggregatedWindowResult> {
317 self.merge_partials(window_id, false)
318 }
319
320 pub fn get_completed(&self, window_id: u64) -> Option<AggregatedWindowResult> {
322 self.completed.read().get(&window_id).cloned()
323 }
324
325 pub fn drain_completed(&self) -> Vec<AggregatedWindowResult> {
327 let mut completed = self.completed.write();
328 let keys: Vec<u64> = completed.keys().cloned().collect();
329 keys.into_iter()
330 .filter_map(|k| completed.remove(&k))
331 .collect()
332 }
333
334 pub fn timeout(&self) -> Duration {
336 self.timeout
337 }
338
339 fn try_aggregate(&self, window_id: u64) -> DistributedResult<Option<AggregatedWindowResult>> {
340 let contributing: HashSet<String> = {
341 let results = self.partial_results.read();
342 results
343 .keys()
344 .filter(|(wid, _)| *wid == window_id)
345 .map(|(_, nid)| nid.clone())
346 .collect()
347 };
348 if contributing == self.expected_nodes {
349 let agg = self.merge_partials(window_id, true)?;
350 Ok(Some(agg))
351 } else {
352 Ok(None)
353 }
354 }
355
356 fn merge_partials(
357 &self,
358 window_id: u64,
359 all_present: bool,
360 ) -> DistributedResult<AggregatedWindowResult> {
361 let results = self.partial_results.read();
362 let partials: Vec<&PartialWindowResult> = results
363 .iter()
364 .filter(|((wid, _), _)| *wid == window_id)
365 .map(|(_, v)| v)
366 .collect();
367
368 if partials.is_empty() {
369 return Err(DistributedStreamError::WindowAggregation(format!(
370 "No partial results for window {}",
371 window_id
372 )));
373 }
374
375 let total_events = partials.iter().map(|p| p.event_count).sum::<u64>();
376 let global_sum = partials.iter().map(|p| p.partial_sum).sum::<f64>();
377 let global_min = partials
378 .iter()
379 .map(|p| p.partial_min)
380 .fold(f64::INFINITY, f64::min);
381 let global_max = partials
382 .iter()
383 .map(|p| p.partial_max)
384 .fold(f64::NEG_INFINITY, f64::max);
385 let global_avg = if total_events > 0 {
386 global_sum / total_events as f64
387 } else {
388 0.0
389 };
390
391 let agg = AggregatedWindowResult {
392 window_id,
393 total_events,
394 global_sum,
395 global_min,
396 global_max,
397 global_avg,
398 contributing_nodes: partials.len(),
399 is_complete: all_present,
400 };
401 self.completed.write().insert(window_id, agg.clone());
402 Ok(agg)
403 }
404}
405
406#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct TopologyConfig {
411 pub max_partitions_per_node: usize,
413 pub heartbeat_interval: Duration,
415 pub node_timeout: Duration,
417 pub virtual_nodes: usize,
419 pub replication_factor: usize,
421}
422
423impl Default for TopologyConfig {
424 fn default() -> Self {
425 Self {
426 max_partitions_per_node: 64,
427 heartbeat_interval: Duration::from_secs(5),
428 node_timeout: Duration::from_secs(30),
429 virtual_nodes: 150,
430 replication_factor: 2,
431 }
432 }
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct TopologyStats {
438 pub total_nodes: usize,
440 pub healthy_nodes: usize,
442 pub total_partitions: usize,
444 pub avg_partitions_per_node: f64,
446 pub snapshot_time: std::time::SystemTime,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
452pub enum TopologyChange {
453 NodeAdded(String),
455 NodeRemoved(String),
457 PartitionReassigned {
459 partition: u32,
460 from: String,
461 to: String,
462 },
463 NodeStatusChanged { node_id: String, status: NodeStatus },
465}
466
467pub struct DistributedStreamTopology {
472 config: TopologyConfig,
473 router: Arc<RwLock<ConsistentHashRouter>>,
474 partition_map: Arc<RwLock<HashMap<u32, String>>>,
476 total_partitions: u32,
478 change_tx: mpsc::Sender<TopologyChange>,
480 change_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<TopologyChange>>>,
482}
483
484impl DistributedStreamTopology {
485 pub fn new(config: TopologyConfig, total_partitions: u32) -> Self {
487 let (change_tx, change_rx) = mpsc::channel(1024);
488 Self {
489 config: config.clone(),
490 router: Arc::new(RwLock::new(ConsistentHashRouter::new(config.virtual_nodes))),
491 partition_map: Arc::new(RwLock::new(HashMap::new())),
492 total_partitions,
493 change_tx,
494 change_rx: Arc::new(tokio::sync::Mutex::new(change_rx)),
495 }
496 }
497
498 pub async fn add_node(&self, node: StreamNode) -> DistributedResult<()> {
500 let node_id = node.node_id.clone();
501 info!("Adding node {} to topology", node_id);
502 self.router.write().add_node(node);
503 self.rebalance_partitions()?;
504 let _ = self
505 .change_tx
506 .send(TopologyChange::NodeAdded(node_id))
507 .await;
508 Ok(())
509 }
510
511 pub async fn remove_node(&self, node_id: &str) -> DistributedResult<()> {
513 info!("Removing node {} from topology", node_id);
514 self.router.write().remove_node(node_id)?;
515 let reassigned = self.reassign_from_node(node_id)?;
516 for (partition, to) in reassigned {
517 let _ = self
518 .change_tx
519 .send(TopologyChange::PartitionReassigned {
520 partition,
521 from: node_id.to_string(),
522 to,
523 })
524 .await;
525 }
526 let _ = self
527 .change_tx
528 .send(TopologyChange::NodeRemoved(node_id.to_string()))
529 .await;
530 Ok(())
531 }
532
533 pub fn route(&self, partition_key: &str) -> DistributedResult<String> {
535 self.router
536 .read()
537 .route(partition_key)
538 .map(|n| n.node_id.clone())
539 }
540
541 pub fn stats(&self) -> TopologyStats {
543 let router = self.router.read();
544 let total_nodes = router.node_count();
545 let healthy_nodes = router
546 .nodes()
547 .filter(|n| n.status == NodeStatus::Healthy)
548 .count();
549 let partition_map = self.partition_map.read();
550 let total_partitions = partition_map.len();
551 let avg_partitions_per_node = if total_nodes > 0 {
552 total_partitions as f64 / total_nodes as f64
553 } else {
554 0.0
555 };
556 TopologyStats {
557 total_nodes,
558 healthy_nodes,
559 total_partitions,
560 avg_partitions_per_node,
561 snapshot_time: std::time::SystemTime::now(),
562 }
563 }
564
565 pub fn change_receiver(&self) -> Arc<tokio::sync::Mutex<mpsc::Receiver<TopologyChange>>> {
567 Arc::clone(&self.change_rx)
568 }
569
570 fn rebalance_partitions(&self) -> DistributedResult<()> {
571 let mut partition_map = self.partition_map.write();
572 let router = self.router.read();
573 for partition in 0..self.total_partitions {
574 let key = partition.to_string();
575 let node_id = router.route(&key)?.node_id.clone();
576 partition_map.insert(partition, node_id);
577 }
578 debug!("Rebalanced {} partitions", self.total_partitions);
579 Ok(())
580 }
581
582 fn reassign_from_node(&self, removed_node_id: &str) -> DistributedResult<Vec<(u32, String)>> {
583 let mut partition_map = self.partition_map.write();
584 let router = self.router.read();
585 let mut reassigned = Vec::new();
586
587 for (partition, node_id) in partition_map.iter_mut() {
588 if node_id.as_str() == removed_node_id {
589 let key = partition.to_string();
590 let new_node = router.route(&key)?.node_id.clone();
591 reassigned.push((*partition, new_node.clone()));
592 *node_id = new_node;
593 }
594 }
595 Ok(reassigned)
596 }
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct StreamJob {
604 pub job_id: String,
606 pub name: String,
608 pub partitions: Vec<u32>,
610 pub node_assignments: HashMap<u32, String>,
612 pub created_at: std::time::SystemTime,
614 pub is_active: bool,
616}
617
618impl StreamJob {
619 pub fn new(job_id: impl Into<String>, name: impl Into<String>, partitions: Vec<u32>) -> Self {
621 Self {
622 job_id: job_id.into(),
623 name: name.into(),
624 partitions,
625 node_assignments: HashMap::new(),
626 created_at: std::time::SystemTime::now(),
627 is_active: true,
628 }
629 }
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct CoordinatorStats {
635 pub total_jobs: usize,
637 pub active_jobs: usize,
639 pub total_partitions_managed: usize,
641 pub cluster_size: usize,
643}
644
645pub struct ClusterStreamCoordinator {
650 topology: Arc<DistributedStreamTopology>,
651 jobs: Arc<RwLock<HashMap<String, StreamJob>>>,
652 start_time: Instant,
653}
654
655impl ClusterStreamCoordinator {
656 pub fn new(topology: Arc<DistributedStreamTopology>) -> Self {
658 Self {
659 topology,
660 jobs: Arc::new(RwLock::new(HashMap::new())),
661 start_time: Instant::now(),
662 }
663 }
664
665 pub async fn submit_job(&self, mut job: StreamJob) -> DistributedResult<String> {
669 let job_id = job.job_id.clone();
670 info!(
671 "Submitting job {} with {} partitions",
672 job_id,
673 job.partitions.len()
674 );
675 for &partition in &job.partitions {
676 let key = partition.to_string();
677 let node_id = self.topology.route(&key)?;
678 job.node_assignments.insert(partition, node_id);
679 }
680 self.jobs.write().insert(job_id.clone(), job);
681 Ok(job_id)
682 }
683
684 pub fn cancel_job(&self, job_id: &str) -> DistributedResult<()> {
686 let mut jobs = self.jobs.write();
687 match jobs.get_mut(job_id) {
688 Some(job) => {
689 job.is_active = false;
690 info!("Cancelled job {}", job_id);
691 Ok(())
692 }
693 None => Err(DistributedStreamError::JobDistribution(format!(
694 "Job {} not found",
695 job_id
696 ))),
697 }
698 }
699
700 pub fn get_job(&self, job_id: &str) -> Option<StreamJob> {
702 self.jobs.read().get(job_id).cloned()
703 }
704
705 pub fn stats(&self) -> CoordinatorStats {
707 let jobs = self.jobs.read();
708 let active_jobs = jobs.values().filter(|j| j.is_active).count();
709 let total_partitions_managed = jobs
710 .values()
711 .filter(|j| j.is_active)
712 .map(|j| j.partitions.len())
713 .sum();
714 let topology_stats = self.topology.stats();
715 CoordinatorStats {
716 total_jobs: jobs.len(),
717 active_jobs,
718 total_partitions_managed,
719 cluster_size: topology_stats.total_nodes,
720 }
721 }
722
723 pub fn uptime(&self) -> Duration {
725 self.start_time.elapsed()
726 }
727
728 pub async fn rebalance_all_jobs(&self) -> DistributedResult<usize> {
732 let job_ids: Vec<String> = self
733 .jobs
734 .read()
735 .values()
736 .filter(|j| j.is_active)
737 .map(|j| j.job_id.clone())
738 .collect();
739 let count = job_ids.len();
740 for job_id in job_ids {
741 let mut jobs = self.jobs.write();
742 if let Some(job) = jobs.get_mut(&job_id) {
743 job.node_assignments.clear();
744 for &partition in &job.partitions.clone() {
745 let key = partition.to_string();
746 let node_id = self.topology.route(&key)?;
747 job.node_assignments.insert(partition, node_id);
748 }
749 }
750 }
751 info!("Rebalanced {} active jobs", count);
752 Ok(count)
753 }
754}
755
756#[cfg(test)]
759mod tests {
760 use super::*;
761
762 #[test]
763 fn test_consistent_hash_router_basic() {
764 let mut router = ConsistentHashRouter::new(10);
765 router.add_node(StreamNode::new("node-1", "127.0.0.1:8001"));
766 router.add_node(StreamNode::new("node-2", "127.0.0.1:8002"));
767 router.add_node(StreamNode::new("node-3", "127.0.0.1:8003"));
768
769 let node = router.route("partition-42").expect("route should succeed");
770 assert!(!node.node_id.is_empty());
771 assert_eq!(router.node_count(), 3);
772 }
773
774 #[test]
775 fn test_consistent_hash_router_no_nodes() {
776 let router = ConsistentHashRouter::new(10);
777 let result = router.route("partition-1");
778 assert!(matches!(
779 result,
780 Err(DistributedStreamError::NoNodesAvailable)
781 ));
782 }
783
784 #[test]
785 fn test_consistent_hash_router_remove_node() {
786 let mut router = ConsistentHashRouter::new(10);
787 router.add_node(StreamNode::new("node-1", "127.0.0.1:8001"));
788 router.add_node(StreamNode::new("node-2", "127.0.0.1:8002"));
789
790 router.remove_node("node-1").expect("remove should succeed");
791 assert_eq!(router.node_count(), 1);
792
793 let result = router.remove_node("ghost-node");
794 assert!(matches!(
795 result,
796 Err(DistributedStreamError::NodeNotFound { .. })
797 ));
798 }
799
800 #[test]
801 fn test_consistent_hash_distribution() {
802 let mut router = ConsistentHashRouter::new(150);
803 router.add_node(StreamNode::new("node-a", "10.0.0.1:9000"));
804 router.add_node(StreamNode::new("node-b", "10.0.0.2:9000"));
805 router.add_node(StreamNode::new("node-c", "10.0.0.3:9000"));
806
807 let mut counts: HashMap<String, usize> = HashMap::new();
808 for i in 0..300u32 {
809 let key = format!("partition-{}", i);
810 let node = router.route(&key).expect("route should succeed");
811 *counts.entry(node.node_id.clone()).or_insert(0) += 1;
812 }
813 for (node_id, count) in &counts {
815 assert!(
816 *count > 20,
817 "distribution too skewed for {}: got {}",
818 node_id,
819 count
820 );
821 }
822 }
823
824 #[test]
825 fn test_distributed_window_aggregator_full() {
826 let expected: HashSet<String> = ["node-1", "node-2", "node-3"]
827 .iter()
828 .map(|s| s.to_string())
829 .collect();
830 let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(10));
831
832 let p1 = PartialWindowResult {
833 window_id: 1000,
834 source_node: "node-1".to_string(),
835 event_count: 100,
836 partial_sum: 50.0,
837 partial_min: 1.0,
838 partial_max: 10.0,
839 is_complete: true,
840 };
841 let result = aggregator
842 .submit_partial(p1)
843 .expect("submit should succeed");
844 assert!(result.is_none(), "should not complete with only 1/3 nodes");
845
846 let p2 = PartialWindowResult {
847 window_id: 1000,
848 source_node: "node-2".to_string(),
849 event_count: 200,
850 partial_sum: 150.0,
851 partial_min: 0.5,
852 partial_max: 15.0,
853 is_complete: true,
854 };
855 let result = aggregator
856 .submit_partial(p2)
857 .expect("submit should succeed");
858 assert!(result.is_none(), "should not complete with only 2/3 nodes");
859
860 let p3 = PartialWindowResult {
861 window_id: 1000,
862 source_node: "node-3".to_string(),
863 event_count: 300,
864 partial_sum: 300.0,
865 partial_min: 0.1,
866 partial_max: 20.0,
867 is_complete: true,
868 };
869 let result = aggregator
870 .submit_partial(p3)
871 .expect("submit should succeed");
872 assert!(result.is_some(), "should complete with all 3 nodes");
873
874 let agg = result.expect("aggregate must be Some");
875 assert_eq!(agg.window_id, 1000);
876 assert_eq!(agg.total_events, 600);
877 assert!((agg.global_sum - 500.0).abs() < 1e-9);
878 assert!((agg.global_min - 0.1).abs() < 1e-9);
879 assert!((agg.global_max - 20.0).abs() < 1e-9);
880 assert!(agg.is_complete);
881 }
882
883 #[test]
884 fn test_distributed_window_force_aggregate() {
885 let expected: HashSet<String> =
886 ["node-1", "node-2"].iter().map(|s| s.to_string()).collect();
887 let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(10));
888
889 let p = PartialWindowResult {
890 window_id: 2000,
891 source_node: "node-1".to_string(),
892 event_count: 50,
893 partial_sum: 25.0,
894 partial_min: 2.0,
895 partial_max: 8.0,
896 is_complete: false,
897 };
898 aggregator.submit_partial(p).expect("submit should succeed");
899
900 let agg = aggregator
901 .force_aggregate(2000)
902 .expect("force aggregate should succeed");
903 assert_eq!(agg.window_id, 2000);
904 assert_eq!(agg.total_events, 50);
905 assert!(!agg.is_complete);
906 }
907
908 #[test]
909 fn test_distributed_window_drain_completed() {
910 let expected: HashSet<String> = ["node-x"].iter().map(|s| s.to_string()).collect();
911 let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(5));
912
913 let p = PartialWindowResult {
914 window_id: 3000,
915 source_node: "node-x".to_string(),
916 event_count: 10,
917 partial_sum: 5.0,
918 partial_min: 1.0,
919 partial_max: 2.0,
920 is_complete: true,
921 };
922 aggregator.submit_partial(p).expect("submit should succeed");
923
924 let drained = aggregator.drain_completed();
925 assert_eq!(drained.len(), 1);
926 assert_eq!(drained[0].window_id, 3000);
927
928 let empty = aggregator.drain_completed();
930 assert!(empty.is_empty());
931 }
932
933 #[tokio::test]
934 async fn test_distributed_stream_topology_add_remove() {
935 let config = TopologyConfig::default();
936 let topology = DistributedStreamTopology::new(config, 16);
937
938 topology
939 .add_node(StreamNode::new("node-a", "10.0.0.1:9000"))
940 .await
941 .expect("add node should succeed");
942 topology
943 .add_node(StreamNode::new("node-b", "10.0.0.2:9000"))
944 .await
945 .expect("add node should succeed");
946
947 let stats = topology.stats();
948 assert_eq!(stats.total_nodes, 2);
949 assert_eq!(stats.total_partitions, 16);
950
951 let routed = topology.route("test-key").expect("route should succeed");
952 assert!(!routed.is_empty());
953
954 topology
955 .remove_node("node-a")
956 .await
957 .expect("remove should succeed");
958 let stats = topology.stats();
959 assert_eq!(stats.total_nodes, 1);
960 }
961
962 #[tokio::test]
963 async fn test_cluster_stream_coordinator_submit_and_stats() {
964 let config = TopologyConfig::default();
965 let topology = Arc::new(DistributedStreamTopology::new(config, 32));
966 topology
967 .add_node(StreamNode::new("worker-1", "10.0.0.1:9000"))
968 .await
969 .expect("add node should succeed");
970 topology
971 .add_node(StreamNode::new("worker-2", "10.0.0.2:9000"))
972 .await
973 .expect("add node should succeed");
974
975 let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
976 let job = StreamJob::new("job-001", "sensor-stream", vec![0, 1, 2, 3, 4, 5, 6, 7]);
977 let job_id = coordinator
978 .submit_job(job)
979 .await
980 .expect("submit should succeed");
981 assert_eq!(job_id, "job-001");
982
983 let retrieved = coordinator.get_job("job-001").expect("job should exist");
984 assert_eq!(retrieved.node_assignments.len(), 8);
985
986 let stats = coordinator.stats();
987 assert_eq!(stats.active_jobs, 1);
988 assert_eq!(stats.total_partitions_managed, 8);
989 assert_eq!(stats.cluster_size, 2);
990 }
991
992 #[tokio::test]
993 async fn test_cluster_stream_coordinator_cancel() {
994 let config = TopologyConfig::default();
995 let topology = Arc::new(DistributedStreamTopology::new(config, 8));
996 topology
997 .add_node(StreamNode::new("n1", "localhost:9001"))
998 .await
999 .expect("add node");
1000
1001 let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
1002 let job = StreamJob::new("job-cancel", "cancel-test", vec![0, 1]);
1003 coordinator
1004 .submit_job(job)
1005 .await
1006 .expect("submit should succeed");
1007
1008 coordinator
1009 .cancel_job("job-cancel")
1010 .expect("cancel should succeed");
1011 let job = coordinator
1012 .get_job("job-cancel")
1013 .expect("job should still exist after cancel");
1014 assert!(!job.is_active);
1015
1016 let result = coordinator.cancel_job("nonexistent");
1017 assert!(matches!(
1018 result,
1019 Err(DistributedStreamError::JobDistribution(_))
1020 ));
1021 }
1022
1023 #[tokio::test]
1024 async fn test_rebalance_jobs_after_node_change() {
1025 let config = TopologyConfig::default();
1026 let topology = Arc::new(DistributedStreamTopology::new(config, 16));
1027 topology
1028 .add_node(StreamNode::new("n1", "localhost:9001"))
1029 .await
1030 .expect("add node");
1031 topology
1032 .add_node(StreamNode::new("n2", "localhost:9002"))
1033 .await
1034 .expect("add node");
1035
1036 let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
1037 let job = StreamJob::new("job-rebalance", "rebalance-test", (0u32..8).collect());
1038 coordinator
1039 .submit_job(job)
1040 .await
1041 .expect("submit should succeed");
1042
1043 topology
1044 .add_node(StreamNode::new("n3", "localhost:9003"))
1045 .await
1046 .expect("add node");
1047 let rebalanced = coordinator
1048 .rebalance_all_jobs()
1049 .await
1050 .expect("rebalance should succeed");
1051 assert_eq!(rebalanced, 1);
1052 }
1053}