Skip to main content

oxirs_stream/distributed/
mod.rs

1//! # Distributed Stream Processing
2//!
3//! Provides infrastructure for coordinating stream processing across multiple nodes
4//! in a cluster. This module implements consistent hashing for partition routing,
5//! distributed window aggregation, and cluster-wide job distribution.
6//!
7//! ## Components
8//!
9//! - [`DistributedStreamTopology`]: Coordinates stream processing across nodes
10//! - [`ConsistentHashRouter`]: Routes stream partitions to nodes via consistent hashing
11//! - [`DistributedWindowAggregator`]: Aggregates windowed results across cluster nodes
12//! - [`ClusterStreamCoordinator`]: Manages stream job distribution across the cluster
13
14use std::collections::{BTreeMap, HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use parking_lot::RwLock;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use tokio::sync::mpsc;
22use tracing::{debug, info};
23
24// ─── Error Types ─────────────────────────────────────────────────────────────
25
26/// Errors that can occur in distributed stream processing
27#[derive(Error, Debug)]
28pub enum DistributedStreamError {
29    #[error("Node not found: {node_id}")]
30    NodeNotFound { node_id: String },
31
32    #[error("No nodes available in topology")]
33    NoNodesAvailable,
34
35    #[error("Partition assignment failed: {reason}")]
36    PartitionAssignmentFailed { reason: String },
37
38    #[error("Window aggregation error: {0}")]
39    WindowAggregation(String),
40
41    #[error("Job distribution error: {0}")]
42    JobDistribution(String),
43
44    #[error("Channel send error: {0}")]
45    ChannelSend(String),
46
47    #[error("Topology inconsistency: {0}")]
48    TopologyInconsistency(String),
49}
50
51/// Result type for distributed stream operations
52pub type DistributedResult<T> = Result<T, DistributedStreamError>;
53
54// ─── Node Definitions ────────────────────────────────────────────────────────
55
56/// Represents the health status of a cluster node
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub enum NodeStatus {
59    /// Node is healthy and processing
60    Healthy,
61    /// Node is degraded but operational
62    Degraded,
63    /// Node is unreachable
64    Unreachable,
65    /// Node is draining (removing from cluster)
66    Draining,
67}
68
69/// Metadata for a node participating in distributed stream processing
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct StreamNode {
72    /// Unique identifier for this node
73    pub node_id: String,
74    /// Network address of the node
75    pub address: String,
76    /// Current health status
77    pub status: NodeStatus,
78    /// Number of partitions assigned to this node
79    pub assigned_partitions: usize,
80    /// Processing capacity (0.0 to 1.0)
81    pub capacity: f64,
82    /// Timestamp of last heartbeat
83    pub last_heartbeat: std::time::SystemTime,
84    /// Node weight for load balancing (higher = more load)
85    pub weight: u32,
86}
87
88impl StreamNode {
89    /// Creates a new healthy stream node
90    pub fn new(node_id: impl Into<String>, address: impl Into<String>) -> Self {
91        Self {
92            node_id: node_id.into(),
93            address: address.into(),
94            status: NodeStatus::Healthy,
95            assigned_partitions: 0,
96            capacity: 1.0,
97            last_heartbeat: std::time::SystemTime::now(),
98            weight: 100,
99        }
100    }
101
102    /// Returns true if this node can accept new partitions
103    pub fn is_available(&self) -> bool {
104        matches!(self.status, NodeStatus::Healthy | NodeStatus::Degraded) && self.capacity > 0.0
105    }
106}
107
108// ─── Consistent Hash Router ──────────────────────────────────────────────────
109
110/// Virtual node entry in the consistent hash ring
111#[derive(Debug, Clone)]
112struct VirtualNode {
113    /// Hash position on the ring
114    hash: u64,
115    /// Owning physical node ID
116    node_id: String,
117}
118
119/// Routes stream partitions to nodes using consistent hashing.
120///
121/// Uses a hash ring with configurable virtual nodes per physical node to
122/// achieve even load distribution and minimal partition movement on topology
123/// changes.
124pub struct ConsistentHashRouter {
125    /// Sorted ring of virtual nodes
126    ring: Vec<VirtualNode>,
127    /// Physical nodes indexed by ID
128    nodes: HashMap<String, StreamNode>,
129    /// Number of virtual nodes per physical node
130    virtual_nodes_per_node: usize,
131}
132
133impl ConsistentHashRouter {
134    /// Creates a new router with the specified number of virtual nodes per physical node.
135    pub fn new(virtual_nodes_per_node: usize) -> Self {
136        Self {
137            ring: Vec::new(),
138            nodes: HashMap::new(),
139            virtual_nodes_per_node,
140        }
141    }
142
143    /// Adds a node to the hash ring.
144    pub fn add_node(&mut self, node: StreamNode) {
145        let node_id = node.node_id.clone();
146        for i in 0..self.virtual_nodes_per_node {
147            let key = format!("{}#{}", node_id, i);
148            let hash = self.fnv1a_hash(key.as_bytes());
149            self.ring.push(VirtualNode {
150                hash,
151                node_id: node_id.clone(),
152            });
153        }
154        self.ring.sort_by_key(|v| v.hash);
155        self.nodes.insert(node_id, node);
156        debug!(
157            "Added node to ring, total virtual nodes: {}",
158            self.ring.len()
159        );
160    }
161
162    /// Removes a node from the hash ring.
163    pub fn remove_node(&mut self, node_id: &str) -> DistributedResult<()> {
164        if !self.nodes.contains_key(node_id) {
165            return Err(DistributedStreamError::NodeNotFound {
166                node_id: node_id.to_string(),
167            });
168        }
169        self.ring.retain(|v| v.node_id != node_id);
170        self.nodes.remove(node_id);
171        info!("Removed node {} from ring", node_id);
172        Ok(())
173    }
174
175    /// Routes a partition key to the responsible node.
176    pub fn route(&self, partition_key: &str) -> DistributedResult<&StreamNode> {
177        if self.ring.is_empty() {
178            return Err(DistributedStreamError::NoNodesAvailable);
179        }
180        let hash = self.fnv1a_hash(partition_key.as_bytes());
181        let start_idx = self.find_ring_position(hash);
182        // Walk the ring to find an available node
183        for offset in 0..self.ring.len() {
184            let candidate_id = &self.ring[(start_idx + offset) % self.ring.len()].node_id;
185            if let Some(node) = self.nodes.get(candidate_id) {
186                if node.is_available() {
187                    return Ok(node);
188                }
189            }
190        }
191        // Fallback: return the initial position even if not available
192        let fallback_id = &self.ring[start_idx].node_id;
193        self.nodes
194            .get(fallback_id)
195            .ok_or_else(|| DistributedStreamError::NodeNotFound {
196                node_id: fallback_id.clone(),
197            })
198    }
199
200    /// Returns an iterator over all known physical nodes.
201    pub fn nodes(&self) -> impl Iterator<Item = &StreamNode> {
202        self.nodes.values()
203    }
204
205    /// Returns the number of physical nodes.
206    pub fn node_count(&self) -> usize {
207        self.nodes.len()
208    }
209
210    /// FNV-1a 64-bit hash
211    fn fnv1a_hash(&self, data: &[u8]) -> u64 {
212        const FNV_OFFSET: u64 = 14695981039346656037;
213        const FNV_PRIME: u64 = 1099511628211;
214        let mut hash = FNV_OFFSET;
215        for byte in data {
216            hash ^= u64::from(*byte);
217            hash = hash.wrapping_mul(FNV_PRIME);
218        }
219        hash
220    }
221
222    /// Binary search for the first ring entry >= hash; wraps to 0 if past end.
223    fn find_ring_position(&self, hash: u64) -> usize {
224        match self.ring.binary_search_by_key(&hash, |v| v.hash) {
225            Ok(idx) => idx,
226            Err(idx) => idx % self.ring.len(),
227        }
228    }
229}
230
231// ─── Distributed Window Aggregator ───────────────────────────────────────────
232
233/// A partial window result from a single node
234#[derive(Debug, Clone, Serialize, Deserialize)]
235pub struct PartialWindowResult {
236    /// Window identifier (start timestamp ms)
237    pub window_id: u64,
238    /// Source node that produced this partial result
239    pub source_node: String,
240    /// Number of events processed in this partial window
241    pub event_count: u64,
242    /// Partial sum for numeric aggregations
243    pub partial_sum: f64,
244    /// Partial minimum value
245    pub partial_min: f64,
246    /// Partial maximum value
247    pub partial_max: f64,
248    /// Whether this partial result is complete for the node
249    pub is_complete: bool,
250}
251
252/// Aggregated result across all nodes for a window
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct AggregatedWindowResult {
255    /// Window identifier
256    pub window_id: u64,
257    /// Total events across all nodes
258    pub total_events: u64,
259    /// Global sum
260    pub global_sum: f64,
261    /// Global minimum
262    pub global_min: f64,
263    /// Global maximum
264    pub global_max: f64,
265    /// Global average (sum / total_events)
266    pub global_avg: f64,
267    /// Number of nodes that contributed
268    pub contributing_nodes: usize,
269    /// Whether all expected nodes contributed
270    pub is_complete: bool,
271}
272
273/// Aggregates windowed results from multiple cluster nodes.
274///
275/// Collects partial results from each node and merges them into a global
276/// aggregate once all expected contributions arrive or a force-flush is triggered.
277pub struct DistributedWindowAggregator {
278    /// Expected node IDs per window
279    expected_nodes: HashSet<String>,
280    /// Partial results keyed by (window_id, node_id)
281    partial_results: Arc<RwLock<HashMap<(u64, String), PartialWindowResult>>>,
282    /// Completed aggregations keyed by window_id
283    completed: Arc<RwLock<BTreeMap<u64, AggregatedWindowResult>>>,
284    /// Timeout before forcing finalisation of incomplete windows
285    timeout: Duration,
286}
287
288impl DistributedWindowAggregator {
289    /// Creates a new aggregator expecting results from the given nodes.
290    pub fn new(expected_nodes: HashSet<String>, timeout: Duration) -> Self {
291        Self {
292            expected_nodes,
293            partial_results: Arc::new(RwLock::new(HashMap::new())),
294            completed: Arc::new(RwLock::new(BTreeMap::new())),
295            timeout,
296        }
297    }
298
299    /// Submits a partial result from a node.
300    ///
301    /// Returns `Some(AggregatedWindowResult)` if all expected nodes have now
302    /// contributed for this window; otherwise returns `None`.
303    pub fn submit_partial(
304        &self,
305        partial: PartialWindowResult,
306    ) -> DistributedResult<Option<AggregatedWindowResult>> {
307        let window_id = partial.window_id;
308        {
309            let mut results = self.partial_results.write();
310            results.insert((window_id, partial.source_node.clone()), partial);
311        }
312        self.try_aggregate(window_id)
313    }
314
315    /// Forces aggregation for a window even if not all nodes have contributed.
316    pub fn force_aggregate(&self, window_id: u64) -> DistributedResult<AggregatedWindowResult> {
317        self.merge_partials(window_id, false)
318    }
319
320    /// Returns a completed aggregate for a specific window, if available.
321    pub fn get_completed(&self, window_id: u64) -> Option<AggregatedWindowResult> {
322        self.completed.read().get(&window_id).cloned()
323    }
324
325    /// Drains and returns all completed window results in ascending window order.
326    pub fn drain_completed(&self) -> Vec<AggregatedWindowResult> {
327        let mut completed = self.completed.write();
328        let keys: Vec<u64> = completed.keys().cloned().collect();
329        keys.into_iter()
330            .filter_map(|k| completed.remove(&k))
331            .collect()
332    }
333
334    /// Returns the configured aggregation timeout.
335    pub fn timeout(&self) -> Duration {
336        self.timeout
337    }
338
339    fn try_aggregate(&self, window_id: u64) -> DistributedResult<Option<AggregatedWindowResult>> {
340        let contributing: HashSet<String> = {
341            let results = self.partial_results.read();
342            results
343                .keys()
344                .filter(|(wid, _)| *wid == window_id)
345                .map(|(_, nid)| nid.clone())
346                .collect()
347        };
348        if contributing == self.expected_nodes {
349            let agg = self.merge_partials(window_id, true)?;
350            Ok(Some(agg))
351        } else {
352            Ok(None)
353        }
354    }
355
356    fn merge_partials(
357        &self,
358        window_id: u64,
359        all_present: bool,
360    ) -> DistributedResult<AggregatedWindowResult> {
361        let results = self.partial_results.read();
362        let partials: Vec<&PartialWindowResult> = results
363            .iter()
364            .filter(|((wid, _), _)| *wid == window_id)
365            .map(|(_, v)| v)
366            .collect();
367
368        if partials.is_empty() {
369            return Err(DistributedStreamError::WindowAggregation(format!(
370                "No partial results for window {}",
371                window_id
372            )));
373        }
374
375        let total_events = partials.iter().map(|p| p.event_count).sum::<u64>();
376        let global_sum = partials.iter().map(|p| p.partial_sum).sum::<f64>();
377        let global_min = partials
378            .iter()
379            .map(|p| p.partial_min)
380            .fold(f64::INFINITY, f64::min);
381        let global_max = partials
382            .iter()
383            .map(|p| p.partial_max)
384            .fold(f64::NEG_INFINITY, f64::max);
385        let global_avg = if total_events > 0 {
386            global_sum / total_events as f64
387        } else {
388            0.0
389        };
390
391        let agg = AggregatedWindowResult {
392            window_id,
393            total_events,
394            global_sum,
395            global_min,
396            global_max,
397            global_avg,
398            contributing_nodes: partials.len(),
399            is_complete: all_present,
400        };
401        self.completed.write().insert(window_id, agg.clone());
402        Ok(agg)
403    }
404}
405
406// ─── Distributed Stream Topology ─────────────────────────────────────────────
407
408/// Configuration for a distributed stream topology
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct TopologyConfig {
411    /// Maximum number of partitions per node
412    pub max_partitions_per_node: usize,
413    /// Heartbeat interval for node health checks
414    pub heartbeat_interval: Duration,
415    /// How long before a silent node is considered unreachable
416    pub node_timeout: Duration,
417    /// Number of virtual nodes per physical node for consistent hashing
418    pub virtual_nodes: usize,
419    /// Replication factor for stream partitions
420    pub replication_factor: usize,
421}
422
423impl Default for TopologyConfig {
424    fn default() -> Self {
425        Self {
426            max_partitions_per_node: 64,
427            heartbeat_interval: Duration::from_secs(5),
428            node_timeout: Duration::from_secs(30),
429            virtual_nodes: 150,
430            replication_factor: 2,
431        }
432    }
433}
434
435/// Statistics snapshot for the topology
436#[derive(Debug, Clone, Serialize, Deserialize)]
437pub struct TopologyStats {
438    /// Total number of registered nodes
439    pub total_nodes: usize,
440    /// Number of healthy nodes
441    pub healthy_nodes: usize,
442    /// Total partitions under management
443    pub total_partitions: usize,
444    /// Average partitions per node
445    pub avg_partitions_per_node: f64,
446    /// Timestamp of this snapshot
447    pub snapshot_time: std::time::SystemTime,
448}
449
450/// A topology change event broadcast on topology changes
451#[derive(Debug, Clone, Serialize, Deserialize)]
452pub enum TopologyChange {
453    /// A new node joined
454    NodeAdded(String),
455    /// A node was removed
456    NodeRemoved(String),
457    /// A partition was reassigned from one node to another
458    PartitionReassigned {
459        partition: u32,
460        from: String,
461        to: String,
462    },
463    /// A node changed health status
464    NodeStatusChanged { node_id: String, status: NodeStatus },
465}
466
467/// Coordinates stream processing across multiple cluster nodes.
468///
469/// Maintains cluster membership, assigns partitions to nodes using consistent
470/// hashing, and broadcasts topology change events.
471pub struct DistributedStreamTopology {
472    config: TopologyConfig,
473    router: Arc<RwLock<ConsistentHashRouter>>,
474    /// Partition assignment: partition_id to node_id
475    partition_map: Arc<RwLock<HashMap<u32, String>>>,
476    /// Total number of partitions managed by this topology
477    total_partitions: u32,
478    /// Notification channel for topology changes
479    change_tx: mpsc::Sender<TopologyChange>,
480    /// Receiver exposed to callers
481    change_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<TopologyChange>>>,
482}
483
484impl DistributedStreamTopology {
485    /// Creates a new topology with the given configuration and partition count.
486    pub fn new(config: TopologyConfig, total_partitions: u32) -> Self {
487        let (change_tx, change_rx) = mpsc::channel(1024);
488        Self {
489            config: config.clone(),
490            router: Arc::new(RwLock::new(ConsistentHashRouter::new(config.virtual_nodes))),
491            partition_map: Arc::new(RwLock::new(HashMap::new())),
492            total_partitions,
493            change_tx,
494            change_rx: Arc::new(tokio::sync::Mutex::new(change_rx)),
495        }
496    }
497
498    /// Adds a node to the topology and rebalances all partitions.
499    pub async fn add_node(&self, node: StreamNode) -> DistributedResult<()> {
500        let node_id = node.node_id.clone();
501        info!("Adding node {} to topology", node_id);
502        self.router.write().add_node(node);
503        self.rebalance_partitions()?;
504        let _ = self
505            .change_tx
506            .send(TopologyChange::NodeAdded(node_id))
507            .await;
508        Ok(())
509    }
510
511    /// Removes a node from the topology and reassigns its partitions.
512    pub async fn remove_node(&self, node_id: &str) -> DistributedResult<()> {
513        info!("Removing node {} from topology", node_id);
514        self.router.write().remove_node(node_id)?;
515        let reassigned = self.reassign_from_node(node_id)?;
516        for (partition, to) in reassigned {
517            let _ = self
518                .change_tx
519                .send(TopologyChange::PartitionReassigned {
520                    partition,
521                    from: node_id.to_string(),
522                    to,
523                })
524                .await;
525        }
526        let _ = self
527            .change_tx
528            .send(TopologyChange::NodeRemoved(node_id.to_string()))
529            .await;
530        Ok(())
531    }
532
533    /// Routes a partition key to its responsible node, returning the node ID.
534    pub fn route(&self, partition_key: &str) -> DistributedResult<String> {
535        self.router
536            .read()
537            .route(partition_key)
538            .map(|n| n.node_id.clone())
539    }
540
541    /// Returns current topology statistics.
542    pub fn stats(&self) -> TopologyStats {
543        let router = self.router.read();
544        let total_nodes = router.node_count();
545        let healthy_nodes = router
546            .nodes()
547            .filter(|n| n.status == NodeStatus::Healthy)
548            .count();
549        let partition_map = self.partition_map.read();
550        let total_partitions = partition_map.len();
551        let avg_partitions_per_node = if total_nodes > 0 {
552            total_partitions as f64 / total_nodes as f64
553        } else {
554            0.0
555        };
556        TopologyStats {
557            total_nodes,
558            healthy_nodes,
559            total_partitions,
560            avg_partitions_per_node,
561            snapshot_time: std::time::SystemTime::now(),
562        }
563    }
564
565    /// Returns a handle to the topology change receiver.
566    pub fn change_receiver(&self) -> Arc<tokio::sync::Mutex<mpsc::Receiver<TopologyChange>>> {
567        Arc::clone(&self.change_rx)
568    }
569
570    fn rebalance_partitions(&self) -> DistributedResult<()> {
571        let mut partition_map = self.partition_map.write();
572        let router = self.router.read();
573        for partition in 0..self.total_partitions {
574            let key = partition.to_string();
575            let node_id = router.route(&key)?.node_id.clone();
576            partition_map.insert(partition, node_id);
577        }
578        debug!("Rebalanced {} partitions", self.total_partitions);
579        Ok(())
580    }
581
582    fn reassign_from_node(&self, removed_node_id: &str) -> DistributedResult<Vec<(u32, String)>> {
583        let mut partition_map = self.partition_map.write();
584        let router = self.router.read();
585        let mut reassigned = Vec::new();
586
587        for (partition, node_id) in partition_map.iter_mut() {
588            if node_id.as_str() == removed_node_id {
589                let key = partition.to_string();
590                let new_node = router.route(&key)?.node_id.clone();
591                reassigned.push((*partition, new_node.clone()));
592                *node_id = new_node;
593            }
594        }
595        Ok(reassigned)
596    }
597}
598
599// ─── Cluster Stream Coordinator ───────────────────────────────────────────────
600
601/// A stream processing job that is distributed across cluster nodes
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct StreamJob {
604    /// Unique job identifier
605    pub job_id: String,
606    /// Human-readable job name
607    pub name: String,
608    /// Partitions assigned to this job
609    pub partitions: Vec<u32>,
610    /// Node assignments for each partition
611    pub node_assignments: HashMap<u32, String>,
612    /// Job creation timestamp
613    pub created_at: std::time::SystemTime,
614    /// Whether this job is currently active
615    pub is_active: bool,
616}
617
618impl StreamJob {
619    /// Creates a new stream job
620    pub fn new(job_id: impl Into<String>, name: impl Into<String>, partitions: Vec<u32>) -> Self {
621        Self {
622            job_id: job_id.into(),
623            name: name.into(),
624            partitions,
625            node_assignments: HashMap::new(),
626            created_at: std::time::SystemTime::now(),
627            is_active: true,
628        }
629    }
630}
631
632/// Statistics for the cluster coordinator
633#[derive(Debug, Clone, Serialize, Deserialize)]
634pub struct CoordinatorStats {
635    /// Total jobs ever submitted
636    pub total_jobs: usize,
637    /// Currently active jobs
638    pub active_jobs: usize,
639    /// Total partitions under active management
640    pub total_partitions_managed: usize,
641    /// Number of nodes in the cluster
642    pub cluster_size: usize,
643}
644
645/// Manages stream job distribution across a cluster.
646///
647/// Uses the [`DistributedStreamTopology`] to place job partitions on nodes
648/// and tracks running jobs.
649pub struct ClusterStreamCoordinator {
650    topology: Arc<DistributedStreamTopology>,
651    jobs: Arc<RwLock<HashMap<String, StreamJob>>>,
652    start_time: Instant,
653}
654
655impl ClusterStreamCoordinator {
656    /// Creates a new coordinator backed by the given topology.
657    pub fn new(topology: Arc<DistributedStreamTopology>) -> Self {
658        Self {
659            topology,
660            jobs: Arc::new(RwLock::new(HashMap::new())),
661            start_time: Instant::now(),
662        }
663    }
664
665    /// Submits a new stream job, distributing its partitions across available nodes.
666    ///
667    /// Returns the job ID on success.
668    pub async fn submit_job(&self, mut job: StreamJob) -> DistributedResult<String> {
669        let job_id = job.job_id.clone();
670        info!(
671            "Submitting job {} with {} partitions",
672            job_id,
673            job.partitions.len()
674        );
675        for &partition in &job.partitions {
676            let key = partition.to_string();
677            let node_id = self.topology.route(&key)?;
678            job.node_assignments.insert(partition, node_id);
679        }
680        self.jobs.write().insert(job_id.clone(), job);
681        Ok(job_id)
682    }
683
684    /// Cancels an active job by ID.
685    pub fn cancel_job(&self, job_id: &str) -> DistributedResult<()> {
686        let mut jobs = self.jobs.write();
687        match jobs.get_mut(job_id) {
688            Some(job) => {
689                job.is_active = false;
690                info!("Cancelled job {}", job_id);
691                Ok(())
692            }
693            None => Err(DistributedStreamError::JobDistribution(format!(
694                "Job {} not found",
695                job_id
696            ))),
697        }
698    }
699
700    /// Returns a snapshot of a specific job by ID.
701    pub fn get_job(&self, job_id: &str) -> Option<StreamJob> {
702        self.jobs.read().get(job_id).cloned()
703    }
704
705    /// Returns coordinator statistics.
706    pub fn stats(&self) -> CoordinatorStats {
707        let jobs = self.jobs.read();
708        let active_jobs = jobs.values().filter(|j| j.is_active).count();
709        let total_partitions_managed = jobs
710            .values()
711            .filter(|j| j.is_active)
712            .map(|j| j.partitions.len())
713            .sum();
714        let topology_stats = self.topology.stats();
715        CoordinatorStats {
716            total_jobs: jobs.len(),
717            active_jobs,
718            total_partitions_managed,
719            cluster_size: topology_stats.total_nodes,
720        }
721    }
722
723    /// Returns the uptime of this coordinator.
724    pub fn uptime(&self) -> Duration {
725        self.start_time.elapsed()
726    }
727
728    /// Rebalances all active jobs when the topology changes.
729    ///
730    /// Returns the number of jobs that were rebalanced.
731    pub async fn rebalance_all_jobs(&self) -> DistributedResult<usize> {
732        let job_ids: Vec<String> = self
733            .jobs
734            .read()
735            .values()
736            .filter(|j| j.is_active)
737            .map(|j| j.job_id.clone())
738            .collect();
739        let count = job_ids.len();
740        for job_id in job_ids {
741            let mut jobs = self.jobs.write();
742            if let Some(job) = jobs.get_mut(&job_id) {
743                job.node_assignments.clear();
744                for &partition in &job.partitions.clone() {
745                    let key = partition.to_string();
746                    let node_id = self.topology.route(&key)?;
747                    job.node_assignments.insert(partition, node_id);
748                }
749            }
750        }
751        info!("Rebalanced {} active jobs", count);
752        Ok(count)
753    }
754}
755
756// ─── Tests ───────────────────────────────────────────────────────────────────
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761
762    #[test]
763    fn test_consistent_hash_router_basic() {
764        let mut router = ConsistentHashRouter::new(10);
765        router.add_node(StreamNode::new("node-1", "127.0.0.1:8001"));
766        router.add_node(StreamNode::new("node-2", "127.0.0.1:8002"));
767        router.add_node(StreamNode::new("node-3", "127.0.0.1:8003"));
768
769        let node = router.route("partition-42").expect("route should succeed");
770        assert!(!node.node_id.is_empty());
771        assert_eq!(router.node_count(), 3);
772    }
773
774    #[test]
775    fn test_consistent_hash_router_no_nodes() {
776        let router = ConsistentHashRouter::new(10);
777        let result = router.route("partition-1");
778        assert!(matches!(
779            result,
780            Err(DistributedStreamError::NoNodesAvailable)
781        ));
782    }
783
784    #[test]
785    fn test_consistent_hash_router_remove_node() {
786        let mut router = ConsistentHashRouter::new(10);
787        router.add_node(StreamNode::new("node-1", "127.0.0.1:8001"));
788        router.add_node(StreamNode::new("node-2", "127.0.0.1:8002"));
789
790        router.remove_node("node-1").expect("remove should succeed");
791        assert_eq!(router.node_count(), 1);
792
793        let result = router.remove_node("ghost-node");
794        assert!(matches!(
795            result,
796            Err(DistributedStreamError::NodeNotFound { .. })
797        ));
798    }
799
800    #[test]
801    fn test_consistent_hash_distribution() {
802        let mut router = ConsistentHashRouter::new(150);
803        router.add_node(StreamNode::new("node-a", "10.0.0.1:9000"));
804        router.add_node(StreamNode::new("node-b", "10.0.0.2:9000"));
805        router.add_node(StreamNode::new("node-c", "10.0.0.3:9000"));
806
807        let mut counts: HashMap<String, usize> = HashMap::new();
808        for i in 0..300u32 {
809            let key = format!("partition-{}", i);
810            let node = router.route(&key).expect("route should succeed");
811            *counts.entry(node.node_id.clone()).or_insert(0) += 1;
812        }
813        // Each node should receive roughly 100 partitions (wide tolerance for small ring)
814        for (node_id, count) in &counts {
815            assert!(
816                *count > 20,
817                "distribution too skewed for {}: got {}",
818                node_id,
819                count
820            );
821        }
822    }
823
824    #[test]
825    fn test_distributed_window_aggregator_full() {
826        let expected: HashSet<String> = ["node-1", "node-2", "node-3"]
827            .iter()
828            .map(|s| s.to_string())
829            .collect();
830        let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(10));
831
832        let p1 = PartialWindowResult {
833            window_id: 1000,
834            source_node: "node-1".to_string(),
835            event_count: 100,
836            partial_sum: 50.0,
837            partial_min: 1.0,
838            partial_max: 10.0,
839            is_complete: true,
840        };
841        let result = aggregator
842            .submit_partial(p1)
843            .expect("submit should succeed");
844        assert!(result.is_none(), "should not complete with only 1/3 nodes");
845
846        let p2 = PartialWindowResult {
847            window_id: 1000,
848            source_node: "node-2".to_string(),
849            event_count: 200,
850            partial_sum: 150.0,
851            partial_min: 0.5,
852            partial_max: 15.0,
853            is_complete: true,
854        };
855        let result = aggregator
856            .submit_partial(p2)
857            .expect("submit should succeed");
858        assert!(result.is_none(), "should not complete with only 2/3 nodes");
859
860        let p3 = PartialWindowResult {
861            window_id: 1000,
862            source_node: "node-3".to_string(),
863            event_count: 300,
864            partial_sum: 300.0,
865            partial_min: 0.1,
866            partial_max: 20.0,
867            is_complete: true,
868        };
869        let result = aggregator
870            .submit_partial(p3)
871            .expect("submit should succeed");
872        assert!(result.is_some(), "should complete with all 3 nodes");
873
874        let agg = result.expect("aggregate must be Some");
875        assert_eq!(agg.window_id, 1000);
876        assert_eq!(agg.total_events, 600);
877        assert!((agg.global_sum - 500.0).abs() < 1e-9);
878        assert!((agg.global_min - 0.1).abs() < 1e-9);
879        assert!((agg.global_max - 20.0).abs() < 1e-9);
880        assert!(agg.is_complete);
881    }
882
883    #[test]
884    fn test_distributed_window_force_aggregate() {
885        let expected: HashSet<String> =
886            ["node-1", "node-2"].iter().map(|s| s.to_string()).collect();
887        let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(10));
888
889        let p = PartialWindowResult {
890            window_id: 2000,
891            source_node: "node-1".to_string(),
892            event_count: 50,
893            partial_sum: 25.0,
894            partial_min: 2.0,
895            partial_max: 8.0,
896            is_complete: false,
897        };
898        aggregator.submit_partial(p).expect("submit should succeed");
899
900        let agg = aggregator
901            .force_aggregate(2000)
902            .expect("force aggregate should succeed");
903        assert_eq!(agg.window_id, 2000);
904        assert_eq!(agg.total_events, 50);
905        assert!(!agg.is_complete);
906    }
907
908    #[test]
909    fn test_distributed_window_drain_completed() {
910        let expected: HashSet<String> = ["node-x"].iter().map(|s| s.to_string()).collect();
911        let aggregator = DistributedWindowAggregator::new(expected, Duration::from_secs(5));
912
913        let p = PartialWindowResult {
914            window_id: 3000,
915            source_node: "node-x".to_string(),
916            event_count: 10,
917            partial_sum: 5.0,
918            partial_min: 1.0,
919            partial_max: 2.0,
920            is_complete: true,
921        };
922        aggregator.submit_partial(p).expect("submit should succeed");
923
924        let drained = aggregator.drain_completed();
925        assert_eq!(drained.len(), 1);
926        assert_eq!(drained[0].window_id, 3000);
927
928        // Draining again should yield empty
929        let empty = aggregator.drain_completed();
930        assert!(empty.is_empty());
931    }
932
933    #[tokio::test]
934    async fn test_distributed_stream_topology_add_remove() {
935        let config = TopologyConfig::default();
936        let topology = DistributedStreamTopology::new(config, 16);
937
938        topology
939            .add_node(StreamNode::new("node-a", "10.0.0.1:9000"))
940            .await
941            .expect("add node should succeed");
942        topology
943            .add_node(StreamNode::new("node-b", "10.0.0.2:9000"))
944            .await
945            .expect("add node should succeed");
946
947        let stats = topology.stats();
948        assert_eq!(stats.total_nodes, 2);
949        assert_eq!(stats.total_partitions, 16);
950
951        let routed = topology.route("test-key").expect("route should succeed");
952        assert!(!routed.is_empty());
953
954        topology
955            .remove_node("node-a")
956            .await
957            .expect("remove should succeed");
958        let stats = topology.stats();
959        assert_eq!(stats.total_nodes, 1);
960    }
961
962    #[tokio::test]
963    async fn test_cluster_stream_coordinator_submit_and_stats() {
964        let config = TopologyConfig::default();
965        let topology = Arc::new(DistributedStreamTopology::new(config, 32));
966        topology
967            .add_node(StreamNode::new("worker-1", "10.0.0.1:9000"))
968            .await
969            .expect("add node should succeed");
970        topology
971            .add_node(StreamNode::new("worker-2", "10.0.0.2:9000"))
972            .await
973            .expect("add node should succeed");
974
975        let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
976        let job = StreamJob::new("job-001", "sensor-stream", vec![0, 1, 2, 3, 4, 5, 6, 7]);
977        let job_id = coordinator
978            .submit_job(job)
979            .await
980            .expect("submit should succeed");
981        assert_eq!(job_id, "job-001");
982
983        let retrieved = coordinator.get_job("job-001").expect("job should exist");
984        assert_eq!(retrieved.node_assignments.len(), 8);
985
986        let stats = coordinator.stats();
987        assert_eq!(stats.active_jobs, 1);
988        assert_eq!(stats.total_partitions_managed, 8);
989        assert_eq!(stats.cluster_size, 2);
990    }
991
992    #[tokio::test]
993    async fn test_cluster_stream_coordinator_cancel() {
994        let config = TopologyConfig::default();
995        let topology = Arc::new(DistributedStreamTopology::new(config, 8));
996        topology
997            .add_node(StreamNode::new("n1", "localhost:9001"))
998            .await
999            .expect("add node");
1000
1001        let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
1002        let job = StreamJob::new("job-cancel", "cancel-test", vec![0, 1]);
1003        coordinator
1004            .submit_job(job)
1005            .await
1006            .expect("submit should succeed");
1007
1008        coordinator
1009            .cancel_job("job-cancel")
1010            .expect("cancel should succeed");
1011        let job = coordinator
1012            .get_job("job-cancel")
1013            .expect("job should still exist after cancel");
1014        assert!(!job.is_active);
1015
1016        let result = coordinator.cancel_job("nonexistent");
1017        assert!(matches!(
1018            result,
1019            Err(DistributedStreamError::JobDistribution(_))
1020        ));
1021    }
1022
1023    #[tokio::test]
1024    async fn test_rebalance_jobs_after_node_change() {
1025        let config = TopologyConfig::default();
1026        let topology = Arc::new(DistributedStreamTopology::new(config, 16));
1027        topology
1028            .add_node(StreamNode::new("n1", "localhost:9001"))
1029            .await
1030            .expect("add node");
1031        topology
1032            .add_node(StreamNode::new("n2", "localhost:9002"))
1033            .await
1034            .expect("add node");
1035
1036        let coordinator = ClusterStreamCoordinator::new(Arc::clone(&topology));
1037        let job = StreamJob::new("job-rebalance", "rebalance-test", (0u32..8).collect());
1038        coordinator
1039            .submit_job(job)
1040            .await
1041            .expect("submit should succeed");
1042
1043        topology
1044            .add_node(StreamNode::new("n3", "localhost:9003"))
1045            .await
1046            .expect("add node");
1047        let rebalanced = coordinator
1048            .rebalance_all_jobs()
1049            .await
1050            .expect("rebalance should succeed");
1051        assert_eq!(rebalanced, 1);
1052    }
1053}