Skip to main content

peat_protocol/models/cell/
mod.rs

1//! Cell state data structures
2//!
3//! This module defines squad data models with CRDT operations:
4//! - Member list: OR-Set (observed-remove set) - members can be added and removed
5//! - Leader election: LWW-Register (last-write-wins) - leader updates with timestamps
6//! - Aggregated capabilities: G-Set (grow-only set) - capabilities accumulate
7
8use crate::models::{Capability, CapabilityExt};
9use uuid::Uuid;
10
11// Re-export protobuf types
12pub use peat_schema::cell::v1::{CellConfig, CellState};
13
14// Extension trait for CellConfig helper methods
15pub trait CellConfigExt {
16    /// Create a new cell configuration
17    fn new(max_size: u32) -> Self;
18
19    /// Create a new cell configuration with a specific ID
20    fn with_id(id: String, max_size: u32) -> Self;
21}
22
23impl CellConfigExt for CellConfig {
24    fn new(max_size: u32) -> Self {
25        Self {
26            id: Uuid::new_v4().to_string(),
27            max_size,
28            min_size: 2,
29            created_at: Some(peat_schema::common::v1::Timestamp {
30                seconds: std::time::SystemTime::now()
31                    .duration_since(std::time::UNIX_EPOCH)
32                    .unwrap()
33                    .as_secs(),
34                nanos: 0,
35            }),
36        }
37    }
38
39    fn with_id(id: String, max_size: u32) -> Self {
40        Self {
41            id,
42            max_size,
43            min_size: 2,
44            created_at: Some(peat_schema::common::v1::Timestamp {
45                seconds: std::time::SystemTime::now()
46                    .duration_since(std::time::UNIX_EPOCH)
47                    .unwrap()
48                    .as_secs(),
49                nanos: 0,
50            }),
51        }
52    }
53}
54
55// Extension trait for CellState helper methods with CRDT operations
56pub trait CellStateExt {
57    /// Create a new cell state
58    fn new(config: CellConfig) -> Self;
59
60    /// Update the timestamp to current time
61    fn update_timestamp(&mut self);
62
63    /// Check if the cell is at capacity
64    fn is_full(&self) -> bool;
65
66    /// Check if the cell meets minimum size
67    fn is_valid(&self) -> bool;
68
69    /// Add a member to the cell (OR-Set add operation)
70    ///
71    /// This implements an OR-Set CRDT where members can be added and removed.
72    /// Concurrent add/remove operations are resolved by: Add wins over Remove.
73    fn add_member(&mut self, node_id: String) -> bool;
74
75    /// Remove a member from the cell (OR-Set remove operation)
76    fn remove_member(&mut self, node_id: &str) -> bool;
77
78    /// Set the cell leader (LWW-Register operation)
79    ///
80    /// This implements Last-Write-Wins semantics for leader election.
81    /// The leader must be a current member of the cell.
82    fn set_leader(&mut self, node_id: String) -> Result<(), &'static str>;
83
84    /// Clear the cell leader
85    fn clear_leader(&mut self);
86
87    /// Add a capability to the cell (G-Set operation)
88    ///
89    /// This implements a G-Set CRDT where capabilities can only be added.
90    /// Capabilities are aggregated from all cell members.
91    fn add_capability(&mut self, capability: Capability);
92
93    /// Get all capabilities of a specific type
94    fn get_capabilities_by_type(
95        &self,
96        capability_type: crate::models::CapabilityType,
97    ) -> Vec<&Capability>;
98
99    /// Check if cell has a specific capability type
100    fn has_capability_type(&self, capability_type: crate::models::CapabilityType) -> bool;
101
102    /// Assign cell to a platoon (LWW-Register operation)
103    fn assign_platoon(&mut self, platoon_id: String);
104
105    /// Remove cell from platoon
106    fn leave_platoon(&mut self);
107
108    /// Merge with another cell state (CRDT merge)
109    ///
110    /// Merges two cell states using CRDT semantics:
111    /// - Members: Union (OR-Set merge)
112    /// - Leader: Take newer timestamp (LWW-Register merge)
113    /// - Capabilities: Union (G-Set merge)
114    fn merge(&mut self, other: &CellState);
115
116    /// Get the count of members
117    fn member_count(&self) -> usize;
118
119    /// Check if a node is a member
120    fn is_member(&self, node_id: &str) -> bool;
121
122    /// Check if this node is the leader
123    fn is_leader(&self, node_id: &str) -> bool;
124
125    /// Get the cell ID (convenience method)
126    fn get_id(&self) -> Option<&str>;
127}
128
129impl CellStateExt for CellState {
130    fn new(config: CellConfig) -> Self {
131        Self {
132            config: Some(config),
133            leader_id: None,
134            members: Vec::new(),
135            capabilities: Vec::new(),
136            platoon_id: None,
137            timestamp: Some(peat_schema::common::v1::Timestamp {
138                seconds: std::time::SystemTime::now()
139                    .duration_since(std::time::UNIX_EPOCH)
140                    .unwrap()
141                    .as_secs(),
142                nanos: 0,
143            }),
144        }
145    }
146
147    fn update_timestamp(&mut self) {
148        self.timestamp = Some(peat_schema::common::v1::Timestamp {
149            seconds: std::time::SystemTime::now()
150                .duration_since(std::time::UNIX_EPOCH)
151                .unwrap()
152                .as_secs(),
153            nanos: 0,
154        });
155    }
156
157    fn is_full(&self) -> bool {
158        if let Some(ref config) = self.config {
159            self.members.len() >= config.max_size as usize
160        } else {
161            false
162        }
163    }
164
165    fn is_valid(&self) -> bool {
166        if let Some(ref config) = self.config {
167            self.members.len() >= config.min_size as usize
168        } else {
169            false
170        }
171    }
172
173    fn add_member(&mut self, node_id: String) -> bool {
174        if self.is_full() {
175            false
176        } else {
177            // Check if already a member
178            if self.members.contains(&node_id) {
179                false
180            } else {
181                self.members.push(node_id);
182                self.update_timestamp();
183                true
184            }
185        }
186    }
187
188    fn remove_member(&mut self, node_id: &str) -> bool {
189        if let Some(pos) = self.members.iter().position(|id| id == node_id) {
190            self.members.remove(pos);
191            self.update_timestamp();
192            // If leader is removed, clear leader
193            if self.leader_id.as_deref() == Some(node_id) {
194                self.leader_id = None;
195            }
196            true
197        } else {
198            false
199        }
200    }
201
202    fn set_leader(&mut self, node_id: String) -> Result<(), &'static str> {
203        if !self.members.contains(&node_id) {
204            return Err("Leader must be a squad member");
205        }
206        self.leader_id = Some(node_id);
207        self.update_timestamp();
208        Ok(())
209    }
210
211    fn clear_leader(&mut self) {
212        self.leader_id = None;
213        self.update_timestamp();
214    }
215
216    fn add_capability(&mut self, capability: Capability) {
217        // Check if capability already exists (by ID)
218        if !self.capabilities.iter().any(|c| c.id == capability.id) {
219            self.capabilities.push(capability);
220            self.update_timestamp();
221        }
222    }
223
224    fn get_capabilities_by_type(
225        &self,
226        capability_type: crate::models::CapabilityType,
227    ) -> Vec<&Capability> {
228        self.capabilities
229            .iter()
230            .filter(|c| c.get_capability_type() == capability_type)
231            .collect()
232    }
233
234    fn has_capability_type(&self, capability_type: crate::models::CapabilityType) -> bool {
235        self.capabilities
236            .iter()
237            .any(|c| c.get_capability_type() == capability_type)
238    }
239
240    fn assign_platoon(&mut self, platoon_id: String) {
241        self.platoon_id = Some(platoon_id);
242        self.update_timestamp();
243    }
244
245    fn leave_platoon(&mut self) {
246        self.platoon_id = None;
247        self.update_timestamp();
248    }
249
250    fn merge(&mut self, other: &CellState) {
251        // Merge members (OR-Set union)
252        for member in &other.members {
253            if !self.members.contains(member) {
254                self.members.push(member.clone());
255            }
256        }
257
258        // Merge capabilities (G-Set union)
259        for cap in &other.capabilities {
260            if !self.capabilities.iter().any(|c| c.id == cap.id) {
261                self.capabilities.push(cap.clone());
262            }
263        }
264
265        // Merge leader and platoon (LWW-Register - take newer)
266        let self_ts = self.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
267        let other_ts = other.timestamp.as_ref().map(|t| t.seconds).unwrap_or(0);
268
269        if other_ts > self_ts {
270            self.leader_id = other.leader_id.clone();
271            self.platoon_id = other.platoon_id.clone();
272            self.timestamp = other.timestamp;
273        }
274    }
275
276    fn member_count(&self) -> usize {
277        self.members.len()
278    }
279
280    fn is_member(&self, node_id: &str) -> bool {
281        self.members.iter().any(|id| id == node_id)
282    }
283
284    fn is_leader(&self, node_id: &str) -> bool {
285        self.leader_id.as_deref() == Some(node_id)
286    }
287
288    fn get_id(&self) -> Option<&str> {
289        self.config.as_ref().map(|c| c.id.as_str())
290    }
291}
292
293#[cfg(test)]
294mod tests;