nomad_protocol/extensions/
metadata.rs1use super::negotiation::{ext_type, Extension, NegotiationError};
31use std::collections::BTreeMap;
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34pub mod metadata_config_flags {
36 pub const TIMESTAMPS: u8 = 0x01;
38 pub const USER_IDS: u8 = 0x02;
40 pub const CAUSALITY: u8 = 0x04;
42 pub const CUSTOM: u8 = 0x08;
44}
45
46pub mod metadata_presence_flags {
48 pub const TIMESTAMP: u8 = 0x01;
50 pub const USER_ID: u8 = 0x02;
52 pub const CAUSALITY: u8 = 0x04;
54 pub const CUSTOM: u8 = 0x08;
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct MetadataConfig {
61 pub flags: u8,
63 pub max_custom_size: u16,
65 pub max_causality_entries: u8,
67}
68
69impl Default for MetadataConfig {
70 fn default() -> Self {
71 Self {
72 flags: metadata_config_flags::TIMESTAMPS | metadata_config_flags::USER_IDS,
73 max_custom_size: 256,
74 max_causality_entries: 16,
75 }
76 }
77}
78
79impl MetadataConfig {
80 pub fn full() -> Self {
82 Self {
83 flags: metadata_config_flags::TIMESTAMPS
84 | metadata_config_flags::USER_IDS
85 | metadata_config_flags::CAUSALITY
86 | metadata_config_flags::CUSTOM,
87 max_custom_size: 1024,
88 max_causality_entries: 32,
89 }
90 }
91
92 pub fn minimal() -> Self {
94 Self {
95 flags: metadata_config_flags::TIMESTAMPS,
96 max_custom_size: 0,
97 max_causality_entries: 0,
98 }
99 }
100
101 pub fn supports_timestamps(&self) -> bool {
103 (self.flags & metadata_config_flags::TIMESTAMPS) != 0
104 }
105
106 pub fn supports_user_ids(&self) -> bool {
108 (self.flags & metadata_config_flags::USER_IDS) != 0
109 }
110
111 pub fn supports_causality(&self) -> bool {
113 (self.flags & metadata_config_flags::CAUSALITY) != 0
114 }
115
116 pub fn supports_custom(&self) -> bool {
118 (self.flags & metadata_config_flags::CUSTOM) != 0
119 }
120
121 pub const fn wire_size() -> usize {
123 4 }
125
126 pub fn to_extension(&self) -> Extension {
128 let mut data = Vec::with_capacity(Self::wire_size());
129 data.push(self.flags);
130 data.extend_from_slice(&self.max_custom_size.to_le_bytes());
131 data.push(self.max_causality_entries);
132 Extension::new(ext_type::METADATA, data)
133 }
134
135 pub fn from_extension(ext: &Extension) -> Option<Self> {
137 if ext.ext_type != ext_type::METADATA || ext.data.len() < Self::wire_size() {
138 return None;
139 }
140 Some(Self {
141 flags: ext.data[0],
142 max_custom_size: u16::from_le_bytes([ext.data[1], ext.data[2]]),
143 max_causality_entries: ext.data[3],
144 })
145 }
146
147 pub fn negotiate(client: &Self, server: &Self) -> Self {
149 Self {
150 flags: client.flags & server.flags,
151 max_custom_size: client.max_custom_size.min(server.max_custom_size),
152 max_causality_entries: client.max_causality_entries.min(server.max_causality_entries),
153 }
154 }
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Default)]
159pub struct VectorClock {
160 entries: BTreeMap<String, u64>,
162}
163
164impl VectorClock {
165 pub fn new() -> Self {
167 Self::default()
168 }
169
170 pub fn increment(&mut self, participant: &str) {
172 let counter = self.entries.entry(participant.to_string()).or_insert(0);
173 *counter += 1;
174 }
175
176 pub fn get(&self, participant: &str) -> u64 {
178 self.entries.get(participant).copied().unwrap_or(0)
179 }
180
181 pub fn set(&mut self, participant: String, counter: u64) {
183 self.entries.insert(participant, counter);
184 }
185
186 pub fn merge(&mut self, other: &VectorClock) {
188 for (id, &counter) in &other.entries {
189 let entry = self.entries.entry(id.clone()).or_insert(0);
190 *entry = (*entry).max(counter);
191 }
192 }
193
194 pub fn happens_before(&self, other: &VectorClock) -> bool {
196 let mut dominated = false;
197
198 for (id, &counter) in &self.entries {
200 let other_counter = other.get(id);
201 if counter > other_counter {
202 return false;
203 }
204 if counter < other_counter {
205 dominated = true;
206 }
207 }
208
209 for (id, &counter) in &other.entries {
211 if !self.entries.contains_key(id) && counter > 0 {
212 dominated = true;
213 }
214 }
215
216 dominated
217 }
218
219 pub fn is_concurrent_with(&self, other: &VectorClock) -> bool {
221 !self.happens_before(other) && !other.happens_before(self)
222 }
223
224 pub fn len(&self) -> usize {
226 self.entries.len()
227 }
228
229 pub fn is_empty(&self) -> bool {
231 self.entries.is_empty()
232 }
233
234 pub fn iter(&self) -> impl Iterator<Item = (&String, &u64)> {
236 self.entries.iter()
237 }
238
239 pub fn encode(&self) -> Vec<u8> {
241 let mut buf = Vec::new();
242 buf.push(self.entries.len() as u8);
243
244 for (id, counter) in &self.entries {
245 buf.push(id.len() as u8);
246 buf.extend_from_slice(id.as_bytes());
247 buf.extend_from_slice(&counter.to_le_bytes());
248 }
249
250 buf
251 }
252
253 pub fn decode(data: &[u8]) -> Result<(Self, usize), NegotiationError> {
255 if data.is_empty() {
256 return Err(NegotiationError::TooShort {
257 expected: 1,
258 actual: 0,
259 });
260 }
261
262 let count = data[0] as usize;
263 let mut offset = 1;
264 let mut clock = Self::new();
265
266 for _ in 0..count {
267 if offset >= data.len() {
268 return Err(NegotiationError::TooShort {
269 expected: offset + 1,
270 actual: data.len(),
271 });
272 }
273
274 let id_len = data[offset] as usize;
275 offset += 1;
276
277 if offset + id_len + 8 > data.len() {
278 return Err(NegotiationError::TooShort {
279 expected: offset + id_len + 8,
280 actual: data.len(),
281 });
282 }
283
284 let id = String::from_utf8(data[offset..offset + id_len].to_vec())
285 .map_err(|_| NegotiationError::InvalidData)?;
286 offset += id_len;
287
288 let counter = u64::from_le_bytes(
289 data[offset..offset + 8]
290 .try_into()
291 .expect("length checked"),
292 );
293 offset += 8;
294
295 clock.set(id, counter);
296 }
297
298 Ok((clock, offset))
299 }
300}
301
302#[derive(Debug, Clone, Default)]
304pub struct Metadata {
305 pub timestamp: Option<u64>,
307 pub user_id: Option<String>,
309 pub causality: Option<VectorClock>,
311 pub custom: Option<Vec<u8>>,
313}
314
315impl Metadata {
316 pub fn new() -> Self {
318 Self::default()
319 }
320
321 pub fn with_timestamp() -> Self {
323 let timestamp = SystemTime::now()
324 .duration_since(UNIX_EPOCH)
325 .unwrap_or_default()
326 .as_micros() as u64;
327 Self {
328 timestamp: Some(timestamp),
329 ..Default::default()
330 }
331 }
332
333 pub fn set_timestamp(&mut self, time: SystemTime) {
335 self.timestamp = Some(
336 time.duration_since(UNIX_EPOCH)
337 .unwrap_or_default()
338 .as_micros() as u64,
339 );
340 }
341
342 pub fn get_timestamp(&self) -> Option<SystemTime> {
344 self.timestamp.map(|micros| {
345 UNIX_EPOCH + Duration::from_micros(micros)
346 })
347 }
348
349 pub fn set_user_id(&mut self, user_id: impl Into<String>) {
351 self.user_id = Some(user_id.into());
352 }
353
354 pub fn set_causality(&mut self, clock: VectorClock) {
356 self.causality = Some(clock);
357 }
358
359 pub fn set_custom(&mut self, data: Vec<u8>) {
361 self.custom = Some(data);
362 }
363
364 pub fn is_empty(&self) -> bool {
366 self.timestamp.is_none()
367 && self.user_id.is_none()
368 && self.causality.is_none()
369 && self.custom.is_none()
370 }
371
372 fn presence_flags(&self) -> u8 {
374 let mut flags = 0u8;
375 if self.timestamp.is_some() {
376 flags |= metadata_presence_flags::TIMESTAMP;
377 }
378 if self.user_id.is_some() {
379 flags |= metadata_presence_flags::USER_ID;
380 }
381 if self.causality.is_some() {
382 flags |= metadata_presence_flags::CAUSALITY;
383 }
384 if self.custom.is_some() {
385 flags |= metadata_presence_flags::CUSTOM;
386 }
387 flags
388 }
389
390 pub fn encode(&self) -> Vec<u8> {
392 let mut buf = Vec::new();
393 buf.push(self.presence_flags());
394
395 if let Some(ts) = self.timestamp {
396 buf.extend_from_slice(&ts.to_le_bytes());
397 }
398
399 if let Some(ref user_id) = self.user_id {
400 buf.push(user_id.len() as u8);
401 buf.extend_from_slice(user_id.as_bytes());
402 }
403
404 if let Some(ref causality) = self.causality {
405 buf.extend_from_slice(&causality.encode());
406 }
407
408 if let Some(ref custom) = self.custom {
409 buf.extend_from_slice(&(custom.len() as u16).to_le_bytes());
410 buf.extend_from_slice(custom);
411 }
412
413 buf
414 }
415
416 pub fn decode(data: &[u8]) -> Result<(Self, usize), NegotiationError> {
418 if data.is_empty() {
419 return Err(NegotiationError::TooShort {
420 expected: 1,
421 actual: 0,
422 });
423 }
424
425 let flags = data[0];
426 let mut offset = 1;
427 let mut metadata = Self::new();
428
429 if (flags & metadata_presence_flags::TIMESTAMP) != 0 {
430 if offset + 8 > data.len() {
431 return Err(NegotiationError::TooShort {
432 expected: offset + 8,
433 actual: data.len(),
434 });
435 }
436 metadata.timestamp = Some(u64::from_le_bytes(
437 data[offset..offset + 8]
438 .try_into()
439 .expect("length checked"),
440 ));
441 offset += 8;
442 }
443
444 if (flags & metadata_presence_flags::USER_ID) != 0 {
445 if offset >= data.len() {
446 return Err(NegotiationError::TooShort {
447 expected: offset + 1,
448 actual: data.len(),
449 });
450 }
451 let id_len = data[offset] as usize;
452 offset += 1;
453
454 if offset + id_len > data.len() {
455 return Err(NegotiationError::TooShort {
456 expected: offset + id_len,
457 actual: data.len(),
458 });
459 }
460 metadata.user_id = Some(
461 String::from_utf8(data[offset..offset + id_len].to_vec())
462 .map_err(|_| NegotiationError::InvalidData)?,
463 );
464 offset += id_len;
465 }
466
467 if (flags & metadata_presence_flags::CAUSALITY) != 0 {
468 let (clock, consumed) = VectorClock::decode(&data[offset..])?;
469 metadata.causality = Some(clock);
470 offset += consumed;
471 }
472
473 if (flags & metadata_presence_flags::CUSTOM) != 0 {
474 if offset + 2 > data.len() {
475 return Err(NegotiationError::TooShort {
476 expected: offset + 2,
477 actual: data.len(),
478 });
479 }
480 let custom_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
481 offset += 2;
482
483 if offset + custom_len > data.len() {
484 return Err(NegotiationError::TooShort {
485 expected: offset + custom_len,
486 actual: data.len(),
487 });
488 }
489 metadata.custom = Some(data[offset..offset + custom_len].to_vec());
490 offset += custom_len;
491 }
492
493 Ok((metadata, offset))
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 #[test]
502 fn test_config_default() {
503 let config = MetadataConfig::default();
504 assert!(config.supports_timestamps());
505 assert!(config.supports_user_ids());
506 assert!(!config.supports_causality());
507 assert!(!config.supports_custom());
508 }
509
510 #[test]
511 fn test_config_extension_roundtrip() {
512 let config = MetadataConfig::full();
513 let ext = config.to_extension();
514 let decoded = MetadataConfig::from_extension(&ext).unwrap();
515 assert_eq!(decoded, config);
516 }
517
518 #[test]
519 fn test_config_negotiate() {
520 let client = MetadataConfig::full();
521 let server = MetadataConfig::minimal();
522
523 let result = MetadataConfig::negotiate(&client, &server);
524 assert!(result.supports_timestamps());
525 assert!(!result.supports_user_ids());
526 assert!(!result.supports_causality());
527 assert!(!result.supports_custom());
528 }
529
530 #[test]
531 fn test_vector_clock_increment() {
532 let mut clock = VectorClock::new();
533 clock.increment("alice");
534 clock.increment("alice");
535 clock.increment("bob");
536
537 assert_eq!(clock.get("alice"), 2);
538 assert_eq!(clock.get("bob"), 1);
539 assert_eq!(clock.get("charlie"), 0);
540 }
541
542 #[test]
543 fn test_vector_clock_merge() {
544 let mut clock1 = VectorClock::new();
545 clock1.set("alice".to_string(), 3);
546 clock1.set("bob".to_string(), 1);
547
548 let mut clock2 = VectorClock::new();
549 clock2.set("alice".to_string(), 1);
550 clock2.set("bob".to_string(), 5);
551 clock2.set("charlie".to_string(), 2);
552
553 clock1.merge(&clock2);
554
555 assert_eq!(clock1.get("alice"), 3); assert_eq!(clock1.get("bob"), 5); assert_eq!(clock1.get("charlie"), 2); }
559
560 #[test]
561 fn test_vector_clock_happens_before() {
562 let mut clock1 = VectorClock::new();
563 clock1.set("alice".to_string(), 1);
564 clock1.set("bob".to_string(), 2);
565
566 let mut clock2 = VectorClock::new();
567 clock2.set("alice".to_string(), 2);
568 clock2.set("bob".to_string(), 3);
569
570 assert!(clock1.happens_before(&clock2));
571 assert!(!clock2.happens_before(&clock1));
572 }
573
574 #[test]
575 fn test_vector_clock_concurrent() {
576 let mut clock1 = VectorClock::new();
577 clock1.set("alice".to_string(), 2);
578 clock1.set("bob".to_string(), 1);
579
580 let mut clock2 = VectorClock::new();
581 clock2.set("alice".to_string(), 1);
582 clock2.set("bob".to_string(), 2);
583
584 assert!(clock1.is_concurrent_with(&clock2));
585 }
586
587 #[test]
588 fn test_vector_clock_roundtrip() {
589 let mut clock = VectorClock::new();
590 clock.set("user1".to_string(), 10);
591 clock.set("user2".to_string(), 20);
592
593 let encoded = clock.encode();
594 let (decoded, _) = VectorClock::decode(&encoded).unwrap();
595
596 assert_eq!(decoded.get("user1"), 10);
597 assert_eq!(decoded.get("user2"), 20);
598 }
599
600 #[test]
601 fn test_metadata_empty() {
602 let metadata = Metadata::new();
603 assert!(metadata.is_empty());
604
605 let encoded = metadata.encode();
606 assert_eq!(encoded.len(), 1); assert_eq!(encoded[0], 0);
608 }
609
610 #[test]
611 fn test_metadata_timestamp_only() {
612 let mut metadata = Metadata::new();
613 metadata.timestamp = Some(1234567890);
614
615 let encoded = metadata.encode();
616 let (decoded, _) = Metadata::decode(&encoded).unwrap();
617
618 assert_eq!(decoded.timestamp, Some(1234567890));
619 assert!(decoded.user_id.is_none());
620 }
621
622 #[test]
623 fn test_metadata_full_roundtrip() {
624 let mut clock = VectorClock::new();
625 clock.set("alice".to_string(), 5);
626
627 let mut metadata = Metadata::new();
628 metadata.timestamp = Some(9999999);
629 metadata.user_id = Some("test-user".to_string());
630 metadata.causality = Some(clock);
631 metadata.custom = Some(vec![0xDE, 0xAD, 0xBE, 0xEF]);
632
633 let encoded = metadata.encode();
634 let (decoded, _) = Metadata::decode(&encoded).unwrap();
635
636 assert_eq!(decoded.timestamp, Some(9999999));
637 assert_eq!(decoded.user_id, Some("test-user".to_string()));
638 assert!(decoded.causality.is_some());
639 assert_eq!(decoded.causality.as_ref().unwrap().get("alice"), 5);
640 assert_eq!(decoded.custom, Some(vec![0xDE, 0xAD, 0xBE, 0xEF]));
641 }
642
643 #[test]
644 fn test_metadata_with_timestamp() {
645 let metadata = Metadata::with_timestamp();
646 assert!(metadata.timestamp.is_some());
647 assert!(metadata.timestamp.unwrap() > 0);
648 }
649
650 #[test]
651 fn test_decode_truncated() {
652 assert!(matches!(
654 Metadata::decode(&[metadata_presence_flags::TIMESTAMP]),
655 Err(NegotiationError::TooShort { .. })
656 ));
657
658 assert!(matches!(
660 Metadata::decode(&[metadata_presence_flags::USER_ID]),
661 Err(NegotiationError::TooShort { .. })
662 ));
663 }
664}