peat-protocol 0.9.0-rc.8

Peat Coordination Protocol — hierarchical capability composition over CRDTs for heterogeneous mesh networks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
//! Geographic self-organization strategy for bootstrap phase
//!
//! Implements geohash-based spatial clustering for autonomous cell formation.
//!
//! # Architecture
//!
//! This module implements a two-layer beacon management system:
//!
//! ## Ditto Layer (Mesh Network)
//! - Each platform maintains ONE beacon document: `platform_beacons/{platform_id}`
//! - Documents have a 30-second TTL for automatic expiration
//! - Updates are LWW-Register CRDTs (no write conflicts)
//! - Platforms query by geohash for proximity-based discovery
//!
//! ## Local Memory Layer (GeographicDiscovery)
//! - Each platform maintains an in-memory cache of received beacons
//! - Janitor service periodically cleans expired beacons from cache
//! - Provides defense-in-depth against stale data
//!
//! See: docs/ADR-002-Beacon-Storage-Architecture.md

use crate::discovery::geo::GeoCoordinate;
use crate::models::capability::Capability;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// Geohash precision for ~153m cells
pub const DEFAULT_GEOHASH_PRECISION: usize = 7;

/// Beacon Time-To-Live (seconds)
///
/// Beacons expire after this duration if not updated. This value is used in:
/// - Ditto document TTL (automatic mesh cleanup)
/// - Local cache expiration checks (janitor service)
///
/// Balances:
/// - Responsiveness: Detect offline nodes quickly
/// - Network efficiency: Reduce unnecessary re-broadcasts
/// - DDIL tolerance: Account for intermittent connectivity
pub const BEACON_TTL_SECONDS: u64 = 30;

/// Minimum nodes required to form a squad
pub const MIN_SQUAD_SIZE: usize = 2;

/// Node beacon for geographic discovery
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GeographicBeacon {
    pub platform_id: String,
    pub position: GeoCoordinate,
    pub geohash_cell: String,
    pub operational: bool,
    pub timestamp: u64,
    pub capabilities: Vec<String>,
}

impl GeographicBeacon {
    /// Create a new geographic beacon
    pub fn new(
        platform_id: String,
        position: GeoCoordinate,
        capabilities: Vec<Capability>,
    ) -> Self {
        let geohash_cell = encode_geohash(&position, DEFAULT_GEOHASH_PRECISION);
        let timestamp = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();

        Self {
            platform_id,
            position,
            geohash_cell,
            operational: true,
            timestamp,
            capabilities: capabilities.iter().map(|c| c.id.clone()).collect(),
        }
    }

    /// Check if beacon is expired
    ///
    /// Returns true if the beacon timestamp is older than BEACON_TTL_SECONDS.
    /// This matches the Ditto document TTL for consistency.
    pub fn is_expired(&self, current_time: u64) -> bool {
        current_time.saturating_sub(self.timestamp) > BEACON_TTL_SECONDS
    }
}

/// Geographic cluster of nodes in the same geohash cell
#[derive(Debug, Clone)]
pub struct GeographicCluster {
    pub geohash_cell: String,
    pub platforms: Vec<GeographicBeacon>,
    pub center: GeoCoordinate,
}

impl GeographicCluster {
    /// Create a new cluster for a geohash cell
    pub fn new(geohash_cell: String) -> Result<Self, &'static str> {
        let center = decode_geohash(&geohash_cell)?;
        Ok(Self {
            geohash_cell,
            platforms: Vec::new(),
            center,
        })
    }

    /// Add a beacon to the cluster
    pub fn add_beacon(&mut self, beacon: GeographicBeacon) {
        self.platforms.push(beacon);
    }

    /// Remove expired beacons
    pub fn remove_expired(&mut self, current_time: u64) {
        self.platforms.retain(|b| !b.is_expired(current_time));
    }

    /// Check if cluster has enough nodes to form a squad
    pub fn can_form_squad(&self, min_size: usize) -> bool {
        self.platforms.len() >= min_size
    }

    /// Get platform IDs in this cluster
    pub fn platform_ids(&self) -> Vec<String> {
        self.platforms
            .iter()
            .map(|b| b.platform_id.clone())
            .collect()
    }
}

/// Encode a geographic coordinate as a geohash
pub fn encode_geohash(coord: &GeoCoordinate, precision: usize) -> String {
    crate::geohash::encode(coord.lon, coord.lat, precision).expect("Valid coordinate")
}

/// Decode a geohash back to a coordinate
pub fn decode_geohash(hash: &str) -> Result<GeoCoordinate, &'static str> {
    let ((lon, lat), _, _) = crate::geohash::decode(hash).map_err(|_| "Invalid geohash")?;
    GeoCoordinate::new(lat, lon, 0.0)
}

/// Geographic discovery manager for organizing nodes into squads
///
/// # Architecture
///
/// GeographicDiscovery maintains an in-memory cache of received beacons from the
/// Ditto mesh network. This cache requires periodic cleanup via a janitor service.
///
/// ## Usage Pattern
///
/// ```rust,ignore
/// // Create discovery manager
/// let discovery = Arc::new(Mutex::new(
///     GeographicDiscovery::new("node_1".to_string())
/// ));
///
/// // Spawn janitor service (runs periodically)
/// let janitor_discovery = discovery.clone();
/// tokio::spawn(async move {
///     let mut interval = tokio::time::interval(Duration::from_secs(10));
///     loop {
///         interval.tick().await;
///         janitor_discovery.lock().unwrap().cleanup_expired();
///     }
/// });
///
/// // Process beacons from Ditto
/// ditto.observe_beacons(|beacon| {
///     discovery.lock().unwrap().process_beacon(beacon);
/// });
/// ```
pub struct GeographicDiscovery {
    clusters: HashMap<String, GeographicCluster>,
    my_platform_id: String,
}

impl GeographicDiscovery {
    /// Create a new geographic discovery manager
    pub fn new(platform_id: String) -> Self {
        Self {
            clusters: HashMap::new(),
            my_platform_id: platform_id,
        }
    }

    /// Process a received beacon
    pub fn process_beacon(&mut self, beacon: GeographicBeacon) {
        let geohash = beacon.geohash_cell.clone();

        self.clusters
            .entry(geohash.clone())
            .or_insert_with(|| GeographicCluster::new(geohash).unwrap())
            .add_beacon(beacon);
    }

    /// Clean up expired beacons
    pub fn cleanup_expired(&mut self) {
        let current_time = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();

        for cluster in self.clusters.values_mut() {
            cluster.remove_expired(current_time);
        }

        // Remove empty clusters
        self.clusters
            .retain(|_, cluster| !cluster.platforms.is_empty());
    }

    /// Find clusters that can form squads
    pub fn find_formable_squads(&self, min_size: usize) -> Vec<&GeographicCluster> {
        self.clusters
            .values()
            .filter(|c| c.can_form_squad(min_size))
            .collect()
    }

    /// Get the cluster containing this platform
    pub fn my_cluster(&self) -> Option<&GeographicCluster> {
        self.clusters.values().find(|c| {
            c.platforms
                .iter()
                .any(|b| b.platform_id == self.my_platform_id)
        })
    }

    /// Check if this platform should initiate cell formation
    /// Returns true if this platform is the "leader" (lowest ID) in its cluster
    pub fn should_initiate_squad_formation(&self) -> bool {
        if let Some(cluster) = self.my_cluster() {
            if cluster.can_form_squad(MIN_SQUAD_SIZE) {
                // Check if we're the lowest platform ID (deterministic leader selection)
                if let Some(min_id) = cluster.platforms.iter().map(|b| &b.platform_id).min() {
                    return min_id == &self.my_platform_id;
                }
            }
        }
        false
    }

    /// Get proposed squad members from my cluster
    pub fn get_squad_members(&self, max_size: usize) -> Option<Vec<String>> {
        if let Some(cluster) = self.my_cluster() {
            if cluster.can_form_squad(MIN_SQUAD_SIZE) {
                let mut members = cluster.platform_ids();
                members.sort(); // Deterministic ordering
                members.truncate(max_size);
                return Some(members);
            }
        }
        None
    }

    /// Get total number of discovered platforms
    pub fn total_platforms(&self) -> usize {
        self.clusters.values().map(|c| c.platforms.len()).sum()
    }

    /// Get number of active clusters
    pub fn cluster_count(&self) -> usize {
        self.clusters.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::models::CapabilityExt;

    #[test]
    fn test_geohash_encoding() {
        let coord = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();
        let hash = encode_geohash(&coord, 7);
        assert_eq!(hash.len(), 7);
        assert!(hash.starts_with("9q8yy")); // SF area
    }

    #[test]
    fn test_geohash_decoding() {
        let hash = "9q8yyk8";
        let coord = decode_geohash(hash).unwrap();
        // Should be approximately SF coordinates
        assert!((coord.lat - 37.77).abs() < 0.01);
        assert!((coord.lon - (-122.41)).abs() < 0.01);
    }

    #[test]
    fn test_beacon_creation() {
        use crate::models::capability::CapabilityType;

        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();
        let caps = vec![
            Capability::new(
                "intel1".to_string(),
                "Intelligence".to_string(),
                CapabilityType::Sensor,
                0.9,
            ),
            Capability::new(
                "comms1".to_string(),
                "Communications".to_string(),
                CapabilityType::Communication,
                0.95,
            ),
        ];

        let beacon = GeographicBeacon::new("node_1".to_string(), pos, caps);

        assert_eq!(beacon.platform_id, "node_1");
        assert_eq!(beacon.position, pos);
        assert!(beacon.geohash_cell.starts_with("9q8yy"));
        assert!(beacon.operational);
        assert_eq!(beacon.capabilities.len(), 2);
    }

    #[test]
    fn test_beacon_expiration() {
        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();
        let beacon = GeographicBeacon::new("node_1".to_string(), pos, vec![]);

        // Not expired immediately
        assert!(!beacon.is_expired(beacon.timestamp));

        // Expired after 31 seconds
        assert!(beacon.is_expired(beacon.timestamp + 31));
    }

    #[test]
    fn test_cluster_creation() {
        let cluster = GeographicCluster::new("9q8yyk8".to_string()).unwrap();
        assert_eq!(cluster.geohash_cell, "9q8yyk8");
        assert_eq!(cluster.platforms.len(), 0);
        assert!(!cluster.can_form_squad(2));
    }

    #[test]
    fn test_cluster_beacon_management() {
        let mut cluster = GeographicCluster::new("9q8yyk8".to_string()).unwrap();
        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();

        let beacon1 = GeographicBeacon::new("node_1".to_string(), pos, vec![]);
        let beacon2 = GeographicBeacon::new("node_2".to_string(), pos, vec![]);

        cluster.add_beacon(beacon1);
        cluster.add_beacon(beacon2);

        assert_eq!(cluster.platforms.len(), 2);
        assert!(cluster.can_form_squad(2));

        let ids = cluster.platform_ids();
        assert!(ids.contains(&"node_1".to_string()));
        assert!(ids.contains(&"node_2".to_string()));
    }

    #[test]
    fn test_discovery_basic_operations() {
        let mut discovery = GeographicDiscovery::new("node_1".to_string());
        assert_eq!(discovery.total_platforms(), 0);
        assert_eq!(discovery.cluster_count(), 0);

        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();
        let beacon = GeographicBeacon::new("node_2".to_string(), pos, vec![]);

        discovery.process_beacon(beacon);

        assert_eq!(discovery.total_platforms(), 1);
        assert_eq!(discovery.cluster_count(), 1);
    }

    #[test]
    fn test_discovery_squad_formation() {
        let mut discovery = GeographicDiscovery::new("node_1".to_string());
        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();

        // Add own beacon
        let beacon1 = GeographicBeacon::new("node_1".to_string(), pos, vec![]);
        discovery.process_beacon(beacon1);

        // Add another platform in same location
        let beacon2 = GeographicBeacon::new("node_2".to_string(), pos, vec![]);
        discovery.process_beacon(beacon2);

        assert_eq!(discovery.total_platforms(), 2);

        // Should be able to form cells now
        let formable = discovery.find_formable_squads(2);
        assert_eq!(formable.len(), 1);

        // platform_1 should be leader (lowest ID)
        assert!(discovery.should_initiate_squad_formation());

        // Get squad members
        let members = discovery.get_squad_members(5).unwrap();
        assert_eq!(members.len(), 2);
        assert!(members.contains(&"node_1".to_string()));
        assert!(members.contains(&"node_2".to_string()));
    }

    #[test]
    fn test_discovery_multiple_clusters() {
        let mut discovery = GeographicDiscovery::new("node_1".to_string());

        // Cluster 1: SF
        let pos1 = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();
        let beacon1 = GeographicBeacon::new("node_1".to_string(), pos1, vec![]);
        let beacon2 = GeographicBeacon::new("node_2".to_string(), pos1, vec![]);

        // Cluster 2: LA (different geohash)
        let pos2 = GeoCoordinate::new(34.0522, -118.2437, 100.0).unwrap();
        let beacon3 = GeographicBeacon::new("node_3".to_string(), pos2, vec![]);
        let beacon4 = GeographicBeacon::new("platform_4".to_string(), pos2, vec![]);

        discovery.process_beacon(beacon1);
        discovery.process_beacon(beacon2);
        discovery.process_beacon(beacon3);
        discovery.process_beacon(beacon4);

        assert_eq!(discovery.total_platforms(), 4);
        assert_eq!(discovery.cluster_count(), 2);

        let formable = discovery.find_formable_squads(2);
        assert_eq!(formable.len(), 2);
    }

    #[test]
    fn test_discovery_cleanup_expired() {
        let mut discovery = GeographicDiscovery::new("node_1".to_string());
        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();

        let mut beacon = GeographicBeacon::new("node_2".to_string(), pos, vec![]);
        beacon.timestamp = 0; // Set to very old timestamp

        discovery.process_beacon(beacon);
        assert_eq!(discovery.total_platforms(), 1);

        discovery.cleanup_expired();
        assert_eq!(discovery.total_platforms(), 0);
        assert_eq!(discovery.cluster_count(), 0);
    }

    #[test]
    fn test_deterministic_leader_selection() {
        // Test that lowest ID is always selected as leader
        let mut discovery1 = GeographicDiscovery::new("platform_a".to_string());
        let mut discovery2 = GeographicDiscovery::new("platform_b".to_string());
        let mut discovery3 = GeographicDiscovery::new("platform_c".to_string());

        let pos = GeoCoordinate::new(37.7749, -122.4194, 100.0).unwrap();

        for id in ["platform_a", "platform_b", "platform_c"] {
            let beacon = GeographicBeacon::new(id.to_string(), pos, vec![]);
            discovery1.process_beacon(beacon.clone());
            discovery2.process_beacon(beacon.clone());
            discovery3.process_beacon(beacon);
        }

        // Only platform_a should be leader
        assert!(discovery1.should_initiate_squad_formation());
        assert!(!discovery2.should_initiate_squad_formation());
        assert!(!discovery3.should_initiate_squad_formation());
    }
}