Skip to main content

peat_protocol/models/
zone.rs

1//! Zone model for hierarchical coordination (E5 Phase 2)
2//!
3//! Zones represent the highest level of the three-tier hierarchy:
4//! - Nodes: Individual platforms with capabilities
5//! - Cells: Tactical groups of nodes working together
6//! - Zones: Strategic coordination across multiple cells
7//!
8//! This module implements CRDT-based zone state management with:
9//! - Cell membership (OR-Set)
10//! - Zone coordinator assignment (LWW-Register)
11//! - Capability aggregation (G-Set)
12//!
13//! ## Protobuf Integration
14//!
15//! This module uses protobuf types from `peat_schema::zone::v1` for multi-transport
16//! support and cross-language compatibility. Extension traits provide CRDT semantics
17//! and helper methods on top of the protobuf types.
18
19use crate::models::{Capability, CapabilityExt};
20use std::time::{SystemTime, UNIX_EPOCH};
21
22// Re-export protobuf types
23pub use peat_schema::zone::v1::{ZoneConfig, ZoneState, ZoneStats};
24
25/// Extension trait for ZoneConfig helper methods
26///
27/// # Example
28/// ```
29/// use peat_protocol::models::zone::{ZoneConfig, ZoneConfigExt};
30///
31/// let config = ZoneConfig::new("zone_north".to_string(), 10);
32/// assert_eq!(config.max_cells, 10);
33/// assert_eq!(config.min_cells, 2);
34/// ```
35pub trait ZoneConfigExt {
36    /// Create a new zone configuration
37    ///
38    /// # Arguments
39    /// * `id` - Unique zone identifier
40    /// * `max_cells` - Maximum number of cells allowed in zone
41    ///
42    /// # Example
43    /// ```
44    /// use peat_protocol::models::zone::{ZoneConfig, ZoneConfigExt};
45    ///
46    /// let config = ZoneConfig::new("zone_alpha".to_string(), 8);
47    /// ```
48    fn new(id: String, max_cells: u32) -> Self;
49
50    /// Create configuration with custom minimum cells
51    fn with_min_cells(self, min_cells: u32) -> Self;
52}
53
54impl ZoneConfigExt for ZoneConfig {
55    fn new(id: String, max_cells: u32) -> Self {
56        Self {
57            id,
58            max_cells,
59            min_cells: 2, // Default minimum
60            created_at: Some(peat_schema::common::v1::Timestamp {
61                seconds: SystemTime::now()
62                    .duration_since(UNIX_EPOCH)
63                    .unwrap()
64                    .as_secs(),
65                nanos: 0,
66            }),
67        }
68    }
69
70    fn with_min_cells(mut self, min_cells: u32) -> Self {
71        self.min_cells = min_cells;
72        self
73    }
74}
75
76/// Extension trait for ZoneState with CRDT operations
77///
78/// Uses multiple CRDT types for distributed consistency:
79/// - Commander: LWW-Register (Last-Write-Wins)
80/// - Cells: OR-Set (Observed-Remove Set)
81/// - Capabilities: G-Set (Grow-only Set)
82///
83/// # Example
84/// ```
85/// use peat_protocol::models::zone::{ZoneConfig, ZoneConfigExt, ZoneState, ZoneStateExt};
86///
87/// let config = ZoneConfig::new("zone_1".to_string(), 10);
88/// let mut zone = ZoneState::new(config);
89///
90/// // Add cells to zone
91/// zone.add_cell("cell_alpha".to_string());
92/// zone.add_cell("cell_beta".to_string());
93///
94/// assert_eq!(zone.cell_count(), 2);
95/// assert!(zone.is_valid()); // Meets minimum cells
96/// ```
97pub trait ZoneStateExt {
98    /// Create a new zone state from configuration
99    ///
100    /// # Example
101    /// ```
102    /// use peat_protocol::models::zone::{ZoneConfig, ZoneConfigExt, ZoneState, ZoneStateExt};
103    ///
104    /// let config = ZoneConfig::new("zone_north".to_string(), 5);
105    /// let zone = ZoneState::new(config);
106    /// ```
107    fn new(config: ZoneConfig) -> Self;
108
109    /// Add a cell to the zone (OR-Set add operation)
110    ///
111    /// Returns `true` if cell was added, `false` if already present or zone is full.
112    ///
113    /// # Example
114    /// ```
115    /// use peat_protocol::models::zone::{ZoneConfig, ZoneConfigExt, ZoneState, ZoneStateExt};
116    ///
117    /// let config = ZoneConfig::new("zone_1".to_string(), 3);
118    /// let mut zone = ZoneState::new(config);
119    ///
120    /// assert!(zone.add_cell("cell_1".to_string()));
121    /// assert!(!zone.add_cell("cell_1".to_string())); // Already present
122    /// ```
123    fn add_cell(&mut self, cell_id: String) -> bool;
124
125    /// Remove a cell from the zone (OR-Set remove operation)
126    ///
127    /// Returns `true` if cell was removed, `false` if not present.
128    fn remove_cell(&mut self, cell_id: &str) -> bool;
129
130    /// Set the zone coordinator (LWW-Register operation)
131    ///
132    /// The coordinator must be a leader of a cell within this zone.
133    ///
134    /// # Arguments
135    /// * `coordinator_id` - Node ID of the coordinator
136    /// * `timestamp` - Logical timestamp for conflict resolution
137    ///
138    /// # Returns
139    /// `true` if assignment was applied, `false` if rejected due to older timestamp
140    fn set_coordinator(&mut self, coordinator_id: String, timestamp: u64) -> bool;
141
142    /// Remove the zone coordinator (LWW-Register deletion)
143    fn remove_coordinator(&mut self, timestamp: u64) -> bool;
144
145    /// Add an aggregated capability (G-Set add operation)
146    ///
147    /// Capabilities are grow-only - once added, they cannot be removed.
148    fn add_capability(&mut self, capability: Capability);
149
150    /// Check if zone meets minimum cell requirement
151    fn is_valid(&self) -> bool;
152
153    /// Check if zone is at maximum capacity
154    fn is_full(&self) -> bool;
155
156    /// Get the number of cells in this zone
157    fn cell_count(&self) -> usize;
158
159    /// Check if a specific cell is a member of this zone
160    fn contains_cell(&self, cell_id: &str) -> bool;
161
162    /// Merge another zone state into this one (CRDT merge)
163    ///
164    /// Applies CRDT semantics for each component:
165    /// - Coordinator: LWW based on timestamp
166    /// - Cells: OR-Set union
167    /// - Capabilities: G-Set union
168    ///
169    /// # Panics
170    /// Panics if attempting to merge zones with different IDs
171    fn merge(&mut self, other: &ZoneState);
172
173    /// Update timestamp to current time
174    fn update_timestamp(&mut self);
175
176    /// Get zone statistics
177    fn stats(&self) -> ZoneStats;
178}
179
180impl ZoneStateExt for ZoneState {
181    fn new(config: ZoneConfig) -> Self {
182        Self {
183            config: Some(config),
184            coordinator_id: None,
185            cells: Vec::new(),
186            aggregated_capabilities: Vec::new(),
187            timestamp: Some(peat_schema::common::v1::Timestamp {
188                seconds: SystemTime::now()
189                    .duration_since(UNIX_EPOCH)
190                    .unwrap()
191                    .as_secs(),
192                nanos: 0,
193            }),
194        }
195    }
196
197    fn add_cell(&mut self, cell_id: String) -> bool {
198        if self.is_full() {
199            return false;
200        }
201
202        // Check if already present (protobuf uses Vec instead of HashSet)
203        if self.cells.iter().any(|id| id == &cell_id) {
204            return false;
205        }
206
207        self.cells.push(cell_id);
208        self.update_timestamp();
209        true
210    }
211
212    fn remove_cell(&mut self, cell_id: &str) -> bool {
213        if let Some(pos) = self.cells.iter().position(|id| id == cell_id) {
214            self.cells.remove(pos);
215            self.update_timestamp();
216            true
217        } else {
218            false
219        }
220    }
221
222    fn set_coordinator(&mut self, coordinator_id: String, timestamp: u64) -> bool {
223        let current_ts = self.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
224
225        if timestamp < current_ts {
226            return false;
227        }
228
229        self.coordinator_id = Some(coordinator_id);
230        self.timestamp = Some(peat_schema::common::v1::Timestamp {
231            seconds: timestamp,
232            nanos: 0,
233        });
234        true
235    }
236
237    fn remove_coordinator(&mut self, timestamp: u64) -> bool {
238        let current_ts = self.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
239
240        if timestamp < current_ts {
241            return false;
242        }
243
244        self.coordinator_id = None;
245        self.timestamp = Some(peat_schema::common::v1::Timestamp {
246            seconds: timestamp,
247            nanos: 0,
248        });
249        true
250    }
251
252    fn add_capability(&mut self, capability: Capability) {
253        // Check if capability already exists (by type)
254        if !self
255            .aggregated_capabilities
256            .iter()
257            .any(|c| c.get_capability_type() == capability.get_capability_type())
258        {
259            self.aggregated_capabilities.push(capability);
260            self.update_timestamp();
261        }
262    }
263
264    fn is_valid(&self) -> bool {
265        if let Some(ref config) = self.config {
266            self.cells.len() >= config.min_cells as usize
267        } else {
268            false
269        }
270    }
271
272    fn is_full(&self) -> bool {
273        if let Some(ref config) = self.config {
274            self.cells.len() >= config.max_cells as usize
275        } else {
276            false
277        }
278    }
279
280    fn cell_count(&self) -> usize {
281        self.cells.len()
282    }
283
284    fn contains_cell(&self, cell_id: &str) -> bool {
285        self.cells.iter().any(|id| id == cell_id)
286    }
287
288    fn merge(&mut self, other: &ZoneState) {
289        // Verify we're merging the same zone
290        let self_id = self.config.as_ref().map(|c| &c.id);
291        let other_id = other.config.as_ref().map(|c| &c.id);
292
293        assert_eq!(self_id, other_id, "Cannot merge zones with different IDs");
294
295        // LWW-Register merge for coordinator
296        let self_ts = self.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
297        let other_ts = other.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
298
299        if other_ts > self_ts {
300            self.coordinator_id = other.coordinator_id.clone();
301            self.timestamp = other.timestamp;
302        }
303
304        // OR-Set merge for cells (union, avoiding duplicates)
305        for cell_id in &other.cells {
306            if !self.cells.iter().any(|id| id == cell_id) {
307                self.cells.push(cell_id.clone());
308            }
309        }
310
311        // G-Set merge for capabilities (union by type)
312        for capability in &other.aggregated_capabilities {
313            if !self
314                .aggregated_capabilities
315                .iter()
316                .any(|c| c.get_capability_type() == capability.get_capability_type())
317            {
318                self.aggregated_capabilities.push(capability.clone());
319            }
320        }
321    }
322
323    fn update_timestamp(&mut self) {
324        self.timestamp = Some(peat_schema::common::v1::Timestamp {
325            seconds: SystemTime::now()
326                .duration_since(UNIX_EPOCH)
327                .unwrap()
328                .as_secs(),
329            nanos: 0,
330        });
331    }
332
333    fn stats(&self) -> ZoneStats {
334        let zone_id = self
335            .config
336            .as_ref()
337            .map(|c| c.id.clone())
338            .unwrap_or_default();
339
340        ZoneStats {
341            zone_id,
342            cell_count: self.cells.len() as u32,
343            total_nodes: 0, // This would need to be calculated from actual cell data
344            unique_capability_count: self.aggregated_capabilities.len() as u32,
345            is_valid: self.is_valid(),
346            is_full: self.is_full(),
347            calculated_at: Some(peat_schema::common::v1::Timestamp {
348                seconds: SystemTime::now()
349                    .duration_since(UNIX_EPOCH)
350                    .unwrap()
351                    .as_secs(),
352                nanos: 0,
353            }),
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use crate::models::CapabilityType;
362
363    #[test]
364    fn test_zone_config_creation() {
365        let config = ZoneConfig::new("zone_test".to_string(), 10);
366        assert_eq!(config.id, "zone_test");
367        assert_eq!(config.max_cells, 10);
368        assert_eq!(config.min_cells, 2);
369    }
370
371    #[test]
372    fn test_zone_config_custom_min() {
373        let config = ZoneConfig::new("zone_test".to_string(), 10).with_min_cells(3);
374        assert_eq!(config.min_cells, 3);
375    }
376
377    #[test]
378    fn test_zone_state_creation() {
379        let config = ZoneConfig::new("zone_1".to_string(), 5);
380        let zone = ZoneState::new(config);
381
382        assert_eq!(zone.config.as_ref().unwrap().id, "zone_1");
383        assert_eq!(zone.cells.len(), 0);
384        assert!(zone.coordinator_id.is_none());
385        assert!(!zone.is_valid()); // Not enough cells
386    }
387
388    #[test]
389    fn test_add_cell() {
390        let config = ZoneConfig::new("zone_1".to_string(), 5);
391        let mut zone = ZoneState::new(config);
392
393        assert!(zone.add_cell("cell_1".to_string()));
394        assert!(zone.add_cell("cell_2".to_string()));
395        assert_eq!(zone.cell_count(), 2);
396
397        // Duplicate add should return false
398        assert!(!zone.add_cell("cell_1".to_string()));
399        assert_eq!(zone.cell_count(), 2);
400    }
401
402    #[test]
403    fn test_remove_cell() {
404        let config = ZoneConfig::new("zone_1".to_string(), 5);
405        let mut zone = ZoneState::new(config);
406
407        zone.add_cell("cell_1".to_string());
408        zone.add_cell("cell_2".to_string());
409
410        assert!(zone.remove_cell("cell_1"));
411        assert_eq!(zone.cell_count(), 1);
412
413        // Removing non-existent cell should return false
414        assert!(!zone.remove_cell("cell_3"));
415    }
416
417    #[test]
418    fn test_zone_capacity() {
419        let config = ZoneConfig::new("zone_1".to_string(), 3);
420        let mut zone = ZoneState::new(config);
421
422        assert!(zone.add_cell("cell_1".to_string()));
423        assert!(zone.add_cell("cell_2".to_string()));
424        assert!(zone.add_cell("cell_3".to_string()));
425
426        assert!(zone.is_full());
427        assert!(!zone.add_cell("cell_4".to_string())); // Should fail - at capacity
428    }
429
430    #[test]
431    fn test_zone_validity() {
432        let config = ZoneConfig::new("zone_1".to_string(), 5).with_min_cells(2);
433        let mut zone = ZoneState::new(config);
434
435        assert!(!zone.is_valid()); // 0 cells
436
437        zone.add_cell("cell_1".to_string());
438        assert!(!zone.is_valid()); // 1 cell, needs 2
439
440        zone.add_cell("cell_2".to_string());
441        assert!(zone.is_valid()); // 2 cells, meets minimum
442    }
443
444    #[test]
445    fn test_set_coordinator() {
446        let config = ZoneConfig::new("zone_1".to_string(), 5);
447        let mut zone = ZoneState::new(config);
448
449        let initial_ts = zone.timestamp.as_ref().unwrap().seconds;
450        let ts1 = initial_ts + 100;
451        assert!(zone.set_coordinator("node_1".to_string(), ts1));
452        assert_eq!(zone.coordinator_id, Some("node_1".to_string()));
453
454        // Older timestamp should be rejected
455        assert!(!zone.set_coordinator("node_2".to_string(), initial_ts + 50));
456        assert_eq!(zone.coordinator_id, Some("node_1".to_string()));
457
458        // Newer timestamp should win
459        assert!(zone.set_coordinator("node_2".to_string(), ts1 + 100));
460        assert_eq!(zone.coordinator_id, Some("node_2".to_string()));
461    }
462
463    #[test]
464    fn test_remove_coordinator() {
465        let config = ZoneConfig::new("zone_1".to_string(), 5);
466        let mut zone = ZoneState::new(config);
467
468        let initial_ts = zone.timestamp.as_ref().unwrap().seconds;
469        zone.set_coordinator("node_1".to_string(), initial_ts + 100);
470
471        // Old timestamp should be rejected
472        assert!(!zone.remove_coordinator(initial_ts + 50));
473        assert_eq!(zone.coordinator_id, Some("node_1".to_string()));
474
475        // Newer timestamp should succeed
476        assert!(zone.remove_coordinator(initial_ts + 200));
477        assert_eq!(zone.coordinator_id, None);
478    }
479
480    #[test]
481    fn test_add_capability() {
482        let config = ZoneConfig::new("zone_1".to_string(), 5);
483        let mut zone = ZoneState::new(config);
484
485        let cap1 = Capability::new(
486            "cap_1".to_string(),
487            "Sensor Capability".to_string(),
488            CapabilityType::Sensor,
489            0.9,
490        );
491
492        zone.add_capability(cap1.clone());
493        assert_eq!(zone.aggregated_capabilities.len(), 1);
494
495        // Adding same capability type again should not duplicate
496        zone.add_capability(cap1.clone());
497        assert_eq!(zone.aggregated_capabilities.len(), 1);
498
499        let cap2 = Capability::new(
500            "cap_2".to_string(),
501            "Payload Capability".to_string(),
502            CapabilityType::Payload,
503            0.8,
504        );
505
506        zone.add_capability(cap2);
507        assert_eq!(zone.aggregated_capabilities.len(), 2);
508    }
509
510    #[test]
511    fn test_contains_cell() {
512        let config = ZoneConfig::new("zone_1".to_string(), 5);
513        let mut zone = ZoneState::new(config);
514
515        zone.add_cell("cell_1".to_string());
516        assert!(zone.contains_cell("cell_1"));
517        assert!(!zone.contains_cell("cell_2"));
518    }
519
520    #[test]
521    fn test_merge_zones() {
522        let config1 = ZoneConfig::new("zone_1".to_string(), 10);
523        let mut zone1 = ZoneState::new(config1);
524
525        let initial_ts = zone1.timestamp.as_ref().unwrap().seconds;
526
527        zone1.add_cell("cell_1".to_string());
528        zone1.add_cell("cell_2".to_string());
529        zone1.set_coordinator("node_1".to_string(), initial_ts + 100);
530
531        let config2 = ZoneConfig::new("zone_1".to_string(), 10);
532        let mut zone2 = ZoneState::new(config2);
533
534        zone2.add_cell("cell_2".to_string()); // Duplicate
535        zone2.add_cell("cell_3".to_string()); // New
536        zone2.set_coordinator("node_2".to_string(), initial_ts + 200); // Newer
537
538        zone1.merge(&zone2);
539
540        // Should have union of cells
541        assert_eq!(zone1.cell_count(), 3);
542        assert!(zone1.contains_cell("cell_1"));
543        assert!(zone1.contains_cell("cell_2"));
544        assert!(zone1.contains_cell("cell_3"));
545
546        // Should have newer coordinator
547        assert_eq!(zone1.coordinator_id, Some("node_2".to_string()));
548    }
549
550    #[test]
551    fn test_merge_capabilities() {
552        let config1 = ZoneConfig::new("zone_1".to_string(), 10);
553        let mut zone1 = ZoneState::new(config1);
554
555        let cap1 = Capability::new(
556            "cap_1".to_string(),
557            "Sensor".to_string(),
558            CapabilityType::Sensor,
559            0.9,
560        );
561        zone1.add_capability(cap1);
562
563        let config2 = ZoneConfig::new("zone_1".to_string(), 10);
564        let mut zone2 = ZoneState::new(config2);
565
566        let cap2 = Capability::new(
567            "cap_2".to_string(),
568            "Payload".to_string(),
569            CapabilityType::Payload,
570            0.8,
571        );
572        zone2.add_capability(cap2);
573
574        zone1.merge(&zone2);
575
576        // Should have both capabilities
577        assert_eq!(zone1.aggregated_capabilities.len(), 2);
578    }
579
580    #[test]
581    #[should_panic(expected = "Cannot merge zones with different IDs")]
582    fn test_merge_different_zones_panics() {
583        let config1 = ZoneConfig::new("zone_1".to_string(), 10);
584        let mut zone1 = ZoneState::new(config1);
585
586        let config2 = ZoneConfig::new("zone_2".to_string(), 10);
587        let zone2 = ZoneState::new(config2);
588
589        zone1.merge(&zone2); // Should panic
590    }
591
592    #[test]
593    fn test_zone_stats() {
594        let config = ZoneConfig::new("zone_test".to_string(), 5).with_min_cells(2);
595        let mut zone = ZoneState::new(config);
596
597        let initial_ts = zone.timestamp.as_ref().unwrap().seconds;
598
599        zone.add_cell("cell_1".to_string());
600        zone.add_cell("cell_2".to_string());
601        zone.set_coordinator("node_1".to_string(), initial_ts + 100);
602
603        let stats = zone.stats();
604        assert_eq!(stats.zone_id, "zone_test");
605        assert_eq!(stats.cell_count, 2);
606        assert!(stats.is_valid);
607        assert!(!stats.is_full);
608    }
609}