nomad_protocol/extensions/
metadata.rs

1//! Metadata extension (0x0007)
2//!
3//! Allows attaching contextual metadata to sync updates, including:
4//! - Timestamps (for ordering, latency measurement)
5//! - User/session identifiers (for multi-user scenarios)
6//! - Causality information (vector clocks, happens-before)
7//! - Custom application-defined metadata
8//!
9//! Wire format for extension negotiation:
10//! ```text
11//! +0  Flags (1 byte)
12//!     - bit 0: Timestamps supported
13//!     - bit 1: User IDs supported
14//!     - bit 2: Causality tracking supported
15//!     - bit 3: Custom metadata supported
16//! +1  Max custom metadata size (2 bytes LE16)
17//! +3  Max causality entries (1 byte) - for vector clocks
18//! ```
19//!
20//! Wire format for metadata block (attached to sync messages):
21//! ```text
22//! +0  Present flags (1 byte) - which fields are present
23//! +1  [Optional] Timestamp (8 bytes LE64) - microseconds since epoch
24//! +N  [Optional] User ID length (1 byte) + User ID bytes
25//! +M  [Optional] Causality entry count (1 byte) + entries
26//!     - Each entry: User ID length (1) + User ID + Counter (8 bytes LE64)
27//! +K  [Optional] Custom length (2 bytes LE16) + Custom data
28//! ```
29
30use super::negotiation::{ext_type, Extension, NegotiationError};
31use std::collections::BTreeMap;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34/// Metadata configuration flags
35pub mod metadata_config_flags {
36    /// Timestamps can be attached
37    pub const TIMESTAMPS: u8 = 0x01;
38    /// User/session IDs can be attached
39    pub const USER_IDS: u8 = 0x02;
40    /// Causality tracking (vector clocks)
41    pub const CAUSALITY: u8 = 0x04;
42    /// Custom application metadata
43    pub const CUSTOM: u8 = 0x08;
44}
45
46/// Metadata presence flags (in wire format)
47pub mod metadata_presence_flags {
48    /// Timestamp is present
49    pub const TIMESTAMP: u8 = 0x01;
50    /// User ID is present
51    pub const USER_ID: u8 = 0x02;
52    /// Causality info is present
53    pub const CAUSALITY: u8 = 0x04;
54    /// Custom data is present
55    pub const CUSTOM: u8 = 0x08;
56}
57
58/// Metadata configuration
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct MetadataConfig {
61    /// Feature flags
62    pub flags: u8,
63    /// Maximum custom metadata size
64    pub max_custom_size: u16,
65    /// Maximum causality entries (vector clock size)
66    pub max_causality_entries: u8,
67}
68
69impl Default for MetadataConfig {
70    fn default() -> Self {
71        Self {
72            flags: metadata_config_flags::TIMESTAMPS | metadata_config_flags::USER_IDS,
73            max_custom_size: 256,
74            max_causality_entries: 16,
75        }
76    }
77}
78
79impl MetadataConfig {
80    /// Create config with all features
81    pub fn full() -> Self {
82        Self {
83            flags: metadata_config_flags::TIMESTAMPS
84                | metadata_config_flags::USER_IDS
85                | metadata_config_flags::CAUSALITY
86                | metadata_config_flags::CUSTOM,
87            max_custom_size: 1024,
88            max_causality_entries: 32,
89        }
90    }
91
92    /// Create minimal config (timestamps only)
93    pub fn minimal() -> Self {
94        Self {
95            flags: metadata_config_flags::TIMESTAMPS,
96            max_custom_size: 0,
97            max_causality_entries: 0,
98        }
99    }
100
101    /// Check if timestamps are supported
102    pub fn supports_timestamps(&self) -> bool {
103        (self.flags & metadata_config_flags::TIMESTAMPS) != 0
104    }
105
106    /// Check if user IDs are supported
107    pub fn supports_user_ids(&self) -> bool {
108        (self.flags & metadata_config_flags::USER_IDS) != 0
109    }
110
111    /// Check if causality tracking is supported
112    pub fn supports_causality(&self) -> bool {
113        (self.flags & metadata_config_flags::CAUSALITY) != 0
114    }
115
116    /// Check if custom metadata is supported
117    pub fn supports_custom(&self) -> bool {
118        (self.flags & metadata_config_flags::CUSTOM) != 0
119    }
120
121    /// Wire size
122    pub const fn wire_size() -> usize {
123        4 // flags(1) + max_custom(2) + max_causality(1)
124    }
125
126    /// Encode to extension
127    pub fn to_extension(&self) -> Extension {
128        let mut data = Vec::with_capacity(Self::wire_size());
129        data.push(self.flags);
130        data.extend_from_slice(&self.max_custom_size.to_le_bytes());
131        data.push(self.max_causality_entries);
132        Extension::new(ext_type::METADATA, data)
133    }
134
135    /// Decode from extension
136    pub fn from_extension(ext: &Extension) -> Option<Self> {
137        if ext.ext_type != ext_type::METADATA || ext.data.len() < Self::wire_size() {
138            return None;
139        }
140        Some(Self {
141            flags: ext.data[0],
142            max_custom_size: u16::from_le_bytes([ext.data[1], ext.data[2]]),
143            max_causality_entries: ext.data[3],
144        })
145    }
146
147    /// Negotiate between client and server
148    pub fn negotiate(client: &Self, server: &Self) -> Self {
149        Self {
150            flags: client.flags & server.flags,
151            max_custom_size: client.max_custom_size.min(server.max_custom_size),
152            max_causality_entries: client.max_causality_entries.min(server.max_causality_entries),
153        }
154    }
155}
156
157/// A vector clock for causality tracking
158#[derive(Debug, Clone, PartialEq, Eq, Default)]
159pub struct VectorClock {
160    /// Mapping of participant ID to logical timestamp
161    entries: BTreeMap<String, u64>,
162}
163
164impl VectorClock {
165    /// Create empty vector clock
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    /// Increment counter for a participant
171    pub fn increment(&mut self, participant: &str) {
172        let counter = self.entries.entry(participant.to_string()).or_insert(0);
173        *counter += 1;
174    }
175
176    /// Get counter for a participant
177    pub fn get(&self, participant: &str) -> u64 {
178        self.entries.get(participant).copied().unwrap_or(0)
179    }
180
181    /// Set counter for a participant
182    pub fn set(&mut self, participant: String, counter: u64) {
183        self.entries.insert(participant, counter);
184    }
185
186    /// Merge with another vector clock (take max of each entry)
187    pub fn merge(&mut self, other: &VectorClock) {
188        for (id, &counter) in &other.entries {
189            let entry = self.entries.entry(id.clone()).or_insert(0);
190            *entry = (*entry).max(counter);
191        }
192    }
193
194    /// Check if this clock happens-before another
195    pub fn happens_before(&self, other: &VectorClock) -> bool {
196        let mut dominated = false;
197
198        // Check all entries in self are <= other
199        for (id, &counter) in &self.entries {
200            let other_counter = other.get(id);
201            if counter > other_counter {
202                return false;
203            }
204            if counter < other_counter {
205                dominated = true;
206            }
207        }
208
209        // Check if other has any entries not in self
210        for (id, &counter) in &other.entries {
211            if !self.entries.contains_key(id) && counter > 0 {
212                dominated = true;
213            }
214        }
215
216        dominated
217    }
218
219    /// Check if two clocks are concurrent (neither happens-before the other)
220    pub fn is_concurrent_with(&self, other: &VectorClock) -> bool {
221        !self.happens_before(other) && !other.happens_before(self)
222    }
223
224    /// Number of entries
225    pub fn len(&self) -> usize {
226        self.entries.len()
227    }
228
229    /// Check if empty
230    pub fn is_empty(&self) -> bool {
231        self.entries.is_empty()
232    }
233
234    /// Iterate over entries
235    pub fn iter(&self) -> impl Iterator<Item = (&String, &u64)> {
236        self.entries.iter()
237    }
238
239    /// Encode to bytes
240    pub fn encode(&self) -> Vec<u8> {
241        let mut buf = Vec::new();
242        buf.push(self.entries.len() as u8);
243
244        for (id, counter) in &self.entries {
245            buf.push(id.len() as u8);
246            buf.extend_from_slice(id.as_bytes());
247            buf.extend_from_slice(&counter.to_le_bytes());
248        }
249
250        buf
251    }
252
253    /// Decode from bytes
254    pub fn decode(data: &[u8]) -> Result<(Self, usize), NegotiationError> {
255        if data.is_empty() {
256            return Err(NegotiationError::TooShort {
257                expected: 1,
258                actual: 0,
259            });
260        }
261
262        let count = data[0] as usize;
263        let mut offset = 1;
264        let mut clock = Self::new();
265
266        for _ in 0..count {
267            if offset >= data.len() {
268                return Err(NegotiationError::TooShort {
269                    expected: offset + 1,
270                    actual: data.len(),
271                });
272            }
273
274            let id_len = data[offset] as usize;
275            offset += 1;
276
277            if offset + id_len + 8 > data.len() {
278                return Err(NegotiationError::TooShort {
279                    expected: offset + id_len + 8,
280                    actual: data.len(),
281                });
282            }
283
284            let id = String::from_utf8(data[offset..offset + id_len].to_vec())
285                .map_err(|_| NegotiationError::InvalidData)?;
286            offset += id_len;
287
288            let counter = u64::from_le_bytes(
289                data[offset..offset + 8]
290                    .try_into()
291                    .expect("length checked"),
292            );
293            offset += 8;
294
295            clock.set(id, counter);
296        }
297
298        Ok((clock, offset))
299    }
300}
301
302/// Metadata attached to a sync message
303#[derive(Debug, Clone, Default)]
304pub struct Metadata {
305    /// Timestamp (microseconds since epoch)
306    pub timestamp: Option<u64>,
307    /// User/session identifier
308    pub user_id: Option<String>,
309    /// Causality information
310    pub causality: Option<VectorClock>,
311    /// Custom application data
312    pub custom: Option<Vec<u8>>,
313}
314
315impl Metadata {
316    /// Create empty metadata
317    pub fn new() -> Self {
318        Self::default()
319    }
320
321    /// Create with current timestamp
322    pub fn with_timestamp() -> Self {
323        let timestamp = SystemTime::now()
324            .duration_since(UNIX_EPOCH)
325            .unwrap_or_default()
326            .as_micros() as u64;
327        Self {
328            timestamp: Some(timestamp),
329            ..Default::default()
330        }
331    }
332
333    /// Set timestamp from SystemTime
334    pub fn set_timestamp(&mut self, time: SystemTime) {
335        self.timestamp = Some(
336            time.duration_since(UNIX_EPOCH)
337                .unwrap_or_default()
338                .as_micros() as u64,
339        );
340    }
341
342    /// Get timestamp as SystemTime
343    pub fn get_timestamp(&self) -> Option<SystemTime> {
344        self.timestamp.map(|micros| {
345            UNIX_EPOCH + Duration::from_micros(micros)
346        })
347    }
348
349    /// Set user ID
350    pub fn set_user_id(&mut self, user_id: impl Into<String>) {
351        self.user_id = Some(user_id.into());
352    }
353
354    /// Set causality
355    pub fn set_causality(&mut self, clock: VectorClock) {
356        self.causality = Some(clock);
357    }
358
359    /// Set custom data
360    pub fn set_custom(&mut self, data: Vec<u8>) {
361        self.custom = Some(data);
362    }
363
364    /// Check if empty
365    pub fn is_empty(&self) -> bool {
366        self.timestamp.is_none()
367            && self.user_id.is_none()
368            && self.causality.is_none()
369            && self.custom.is_none()
370    }
371
372    /// Compute presence flags
373    fn presence_flags(&self) -> u8 {
374        let mut flags = 0u8;
375        if self.timestamp.is_some() {
376            flags |= metadata_presence_flags::TIMESTAMP;
377        }
378        if self.user_id.is_some() {
379            flags |= metadata_presence_flags::USER_ID;
380        }
381        if self.causality.is_some() {
382            flags |= metadata_presence_flags::CAUSALITY;
383        }
384        if self.custom.is_some() {
385            flags |= metadata_presence_flags::CUSTOM;
386        }
387        flags
388    }
389
390    /// Encode to bytes
391    pub fn encode(&self) -> Vec<u8> {
392        let mut buf = Vec::new();
393        buf.push(self.presence_flags());
394
395        if let Some(ts) = self.timestamp {
396            buf.extend_from_slice(&ts.to_le_bytes());
397        }
398
399        if let Some(ref user_id) = self.user_id {
400            buf.push(user_id.len() as u8);
401            buf.extend_from_slice(user_id.as_bytes());
402        }
403
404        if let Some(ref causality) = self.causality {
405            buf.extend_from_slice(&causality.encode());
406        }
407
408        if let Some(ref custom) = self.custom {
409            buf.extend_from_slice(&(custom.len() as u16).to_le_bytes());
410            buf.extend_from_slice(custom);
411        }
412
413        buf
414    }
415
416    /// Decode from bytes
417    pub fn decode(data: &[u8]) -> Result<(Self, usize), NegotiationError> {
418        if data.is_empty() {
419            return Err(NegotiationError::TooShort {
420                expected: 1,
421                actual: 0,
422            });
423        }
424
425        let flags = data[0];
426        let mut offset = 1;
427        let mut metadata = Self::new();
428
429        if (flags & metadata_presence_flags::TIMESTAMP) != 0 {
430            if offset + 8 > data.len() {
431                return Err(NegotiationError::TooShort {
432                    expected: offset + 8,
433                    actual: data.len(),
434                });
435            }
436            metadata.timestamp = Some(u64::from_le_bytes(
437                data[offset..offset + 8]
438                    .try_into()
439                    .expect("length checked"),
440            ));
441            offset += 8;
442        }
443
444        if (flags & metadata_presence_flags::USER_ID) != 0 {
445            if offset >= data.len() {
446                return Err(NegotiationError::TooShort {
447                    expected: offset + 1,
448                    actual: data.len(),
449                });
450            }
451            let id_len = data[offset] as usize;
452            offset += 1;
453
454            if offset + id_len > data.len() {
455                return Err(NegotiationError::TooShort {
456                    expected: offset + id_len,
457                    actual: data.len(),
458                });
459            }
460            metadata.user_id = Some(
461                String::from_utf8(data[offset..offset + id_len].to_vec())
462                    .map_err(|_| NegotiationError::InvalidData)?,
463            );
464            offset += id_len;
465        }
466
467        if (flags & metadata_presence_flags::CAUSALITY) != 0 {
468            let (clock, consumed) = VectorClock::decode(&data[offset..])?;
469            metadata.causality = Some(clock);
470            offset += consumed;
471        }
472
473        if (flags & metadata_presence_flags::CUSTOM) != 0 {
474            if offset + 2 > data.len() {
475                return Err(NegotiationError::TooShort {
476                    expected: offset + 2,
477                    actual: data.len(),
478                });
479            }
480            let custom_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
481            offset += 2;
482
483            if offset + custom_len > data.len() {
484                return Err(NegotiationError::TooShort {
485                    expected: offset + custom_len,
486                    actual: data.len(),
487                });
488            }
489            metadata.custom = Some(data[offset..offset + custom_len].to_vec());
490            offset += custom_len;
491        }
492
493        Ok((metadata, offset))
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    #[test]
502    fn test_config_default() {
503        let config = MetadataConfig::default();
504        assert!(config.supports_timestamps());
505        assert!(config.supports_user_ids());
506        assert!(!config.supports_causality());
507        assert!(!config.supports_custom());
508    }
509
510    #[test]
511    fn test_config_extension_roundtrip() {
512        let config = MetadataConfig::full();
513        let ext = config.to_extension();
514        let decoded = MetadataConfig::from_extension(&ext).unwrap();
515        assert_eq!(decoded, config);
516    }
517
518    #[test]
519    fn test_config_negotiate() {
520        let client = MetadataConfig::full();
521        let server = MetadataConfig::minimal();
522
523        let result = MetadataConfig::negotiate(&client, &server);
524        assert!(result.supports_timestamps());
525        assert!(!result.supports_user_ids());
526        assert!(!result.supports_causality());
527        assert!(!result.supports_custom());
528    }
529
530    #[test]
531    fn test_vector_clock_increment() {
532        let mut clock = VectorClock::new();
533        clock.increment("alice");
534        clock.increment("alice");
535        clock.increment("bob");
536
537        assert_eq!(clock.get("alice"), 2);
538        assert_eq!(clock.get("bob"), 1);
539        assert_eq!(clock.get("charlie"), 0);
540    }
541
542    #[test]
543    fn test_vector_clock_merge() {
544        let mut clock1 = VectorClock::new();
545        clock1.set("alice".to_string(), 3);
546        clock1.set("bob".to_string(), 1);
547
548        let mut clock2 = VectorClock::new();
549        clock2.set("alice".to_string(), 1);
550        clock2.set("bob".to_string(), 5);
551        clock2.set("charlie".to_string(), 2);
552
553        clock1.merge(&clock2);
554
555        assert_eq!(clock1.get("alice"), 3); // max(3, 1)
556        assert_eq!(clock1.get("bob"), 5); // max(1, 5)
557        assert_eq!(clock1.get("charlie"), 2); // new entry
558    }
559
560    #[test]
561    fn test_vector_clock_happens_before() {
562        let mut clock1 = VectorClock::new();
563        clock1.set("alice".to_string(), 1);
564        clock1.set("bob".to_string(), 2);
565
566        let mut clock2 = VectorClock::new();
567        clock2.set("alice".to_string(), 2);
568        clock2.set("bob".to_string(), 3);
569
570        assert!(clock1.happens_before(&clock2));
571        assert!(!clock2.happens_before(&clock1));
572    }
573
574    #[test]
575    fn test_vector_clock_concurrent() {
576        let mut clock1 = VectorClock::new();
577        clock1.set("alice".to_string(), 2);
578        clock1.set("bob".to_string(), 1);
579
580        let mut clock2 = VectorClock::new();
581        clock2.set("alice".to_string(), 1);
582        clock2.set("bob".to_string(), 2);
583
584        assert!(clock1.is_concurrent_with(&clock2));
585    }
586
587    #[test]
588    fn test_vector_clock_roundtrip() {
589        let mut clock = VectorClock::new();
590        clock.set("user1".to_string(), 10);
591        clock.set("user2".to_string(), 20);
592
593        let encoded = clock.encode();
594        let (decoded, _) = VectorClock::decode(&encoded).unwrap();
595
596        assert_eq!(decoded.get("user1"), 10);
597        assert_eq!(decoded.get("user2"), 20);
598    }
599
600    #[test]
601    fn test_metadata_empty() {
602        let metadata = Metadata::new();
603        assert!(metadata.is_empty());
604
605        let encoded = metadata.encode();
606        assert_eq!(encoded.len(), 1); // Just flags byte
607        assert_eq!(encoded[0], 0);
608    }
609
610    #[test]
611    fn test_metadata_timestamp_only() {
612        let mut metadata = Metadata::new();
613        metadata.timestamp = Some(1234567890);
614
615        let encoded = metadata.encode();
616        let (decoded, _) = Metadata::decode(&encoded).unwrap();
617
618        assert_eq!(decoded.timestamp, Some(1234567890));
619        assert!(decoded.user_id.is_none());
620    }
621
622    #[test]
623    fn test_metadata_full_roundtrip() {
624        let mut clock = VectorClock::new();
625        clock.set("alice".to_string(), 5);
626
627        let mut metadata = Metadata::new();
628        metadata.timestamp = Some(9999999);
629        metadata.user_id = Some("test-user".to_string());
630        metadata.causality = Some(clock);
631        metadata.custom = Some(vec![0xDE, 0xAD, 0xBE, 0xEF]);
632
633        let encoded = metadata.encode();
634        let (decoded, _) = Metadata::decode(&encoded).unwrap();
635
636        assert_eq!(decoded.timestamp, Some(9999999));
637        assert_eq!(decoded.user_id, Some("test-user".to_string()));
638        assert!(decoded.causality.is_some());
639        assert_eq!(decoded.causality.as_ref().unwrap().get("alice"), 5);
640        assert_eq!(decoded.custom, Some(vec![0xDE, 0xAD, 0xBE, 0xEF]));
641    }
642
643    #[test]
644    fn test_metadata_with_timestamp() {
645        let metadata = Metadata::with_timestamp();
646        assert!(metadata.timestamp.is_some());
647        assert!(metadata.timestamp.unwrap() > 0);
648    }
649
650    #[test]
651    fn test_decode_truncated() {
652        // Timestamp flag set but no data
653        assert!(matches!(
654            Metadata::decode(&[metadata_presence_flags::TIMESTAMP]),
655            Err(NegotiationError::TooShort { .. })
656        ));
657
658        // User ID flag set but no length
659        assert!(matches!(
660            Metadata::decode(&[metadata_presence_flags::USER_ID]),
661            Err(NegotiationError::TooShort { .. })
662        ));
663    }
664}