Skip to main content

pulsedb/sync/
types.rs

1//! Core types for the PulseDB sync protocol.
2//!
3//! This module defines the wire types used for synchronizing data between
4//! PulseDB instances: change payloads, cursors, handshake messages, and
5//! the instance identity type.
6
7use std::fmt;
8
9use serde::{Deserialize, Serialize};
10use uuid::Uuid;
11
12use crate::collective::Collective;
13use crate::experience::Experience;
14use crate::insight::DerivedInsight;
15use crate::relation::ExperienceRelation;
16use crate::types::{CollectiveId, ExperienceId, InsightId, RelationId, Timestamp};
17
18// ============================================================================
19// InstanceId — Unique identity for a PulseDB instance
20// ============================================================================
21
22/// Unique identifier for a PulseDB instance (UUID v7, time-ordered).
23///
24/// Each PulseDB database generates an `InstanceId` on first open and persists
25/// it in the metadata table. This ID is used to identify the source of sync
26/// changes and track per-peer cursors.
27///
28/// # Example
29/// ```
30/// use pulsedb::sync::types::InstanceId;
31///
32/// let id = InstanceId::new();
33/// println!("Instance: {}", id);
34/// assert_eq!(id.as_bytes().len(), 16);
35/// ```
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
37pub struct InstanceId(pub Uuid);
38
39impl InstanceId {
40    /// Creates a new InstanceId with a UUID v7 (time-ordered).
41    #[inline]
42    pub fn new() -> Self {
43        Self(Uuid::now_v7())
44    }
45
46    /// Creates a nil (all zeros) InstanceId.
47    /// Useful for testing or sentinel values.
48    #[inline]
49    pub fn nil() -> Self {
50        Self(Uuid::nil())
51    }
52
53    /// Returns the raw UUID bytes for storage.
54    #[inline]
55    pub fn as_bytes(&self) -> &[u8; 16] {
56        self.0.as_bytes()
57    }
58
59    /// Creates an InstanceId from raw bytes.
60    #[inline]
61    pub fn from_bytes(bytes: [u8; 16]) -> Self {
62        Self(Uuid::from_bytes(bytes))
63    }
64}
65
66impl Default for InstanceId {
67    /// Returns a nil (all zeros) InstanceId.
68    ///
69    /// For a new unique ID, use [`InstanceId::new()`].
70    fn default() -> Self {
71        Self::nil()
72    }
73}
74
75impl fmt::Display for InstanceId {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        write!(f, "{}", self.0)
78    }
79}
80
81// ============================================================================
82// SyncCursor — Tracks sync position per peer
83// ============================================================================
84
85/// Tracks the sync position for a specific peer instance.
86///
87/// Each peer maintains a cursor recording the last WAL sequence number
88/// successfully synced. This enables incremental sync — only changes
89/// after the cursor position are transferred.
90#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
91pub struct SyncCursor {
92    /// The peer instance this cursor tracks.
93    pub instance_id: InstanceId,
94
95    /// The last WAL sequence number successfully synced from/to this peer.
96    pub last_sequence: u64,
97}
98
99impl SyncCursor {
100    /// Creates a new cursor at sequence 0 (beginning of WAL).
101    pub fn new(instance_id: InstanceId) -> Self {
102        Self {
103            instance_id,
104            last_sequence: 0,
105        }
106    }
107}
108
109// ============================================================================
110// SyncEntityType — What kind of entity changed
111// ============================================================================
112
113/// Discriminant for the type of entity in a sync change.
114#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
115#[repr(u8)]
116pub enum SyncEntityType {
117    /// An experience was created, updated, archived, or deleted.
118    Experience = 0,
119    /// A relation was created or deleted.
120    Relation = 1,
121    /// An insight was created or deleted.
122    Insight = 2,
123    /// A collective was created.
124    Collective = 3,
125}
126
127// ============================================================================
128// SerializableExperienceUpdate — Wire-safe mirror of ExperienceUpdate
129// ============================================================================
130
131/// Wire-safe version of [`crate::ExperienceUpdate`] for sync payloads.
132///
133/// The original `ExperienceUpdate` does not derive `Serialize`/`Deserialize`,
134/// so this struct mirrors its fields with full serde support. Use the `From`
135/// impls to convert between the two.
136#[derive(Clone, Debug, Default, Serialize, Deserialize)]
137pub struct SerializableExperienceUpdate {
138    /// New importance score (0.0–1.0).
139    pub importance: Option<f32>,
140
141    /// New confidence score (0.0–1.0).
142    pub confidence: Option<f32>,
143
144    /// Replace domain tags entirely.
145    pub domain: Option<Vec<String>>,
146
147    /// Replace related files entirely.
148    pub related_files: Option<Vec<String>>,
149
150    /// Set archived status.
151    pub archived: Option<bool>,
152}
153
154impl From<crate::experience::ExperienceUpdate> for SerializableExperienceUpdate {
155    fn from(update: crate::experience::ExperienceUpdate) -> Self {
156        Self {
157            importance: update.importance,
158            confidence: update.confidence,
159            domain: update.domain,
160            related_files: update.related_files,
161            archived: update.archived,
162        }
163    }
164}
165
166impl From<SerializableExperienceUpdate> for crate::experience::ExperienceUpdate {
167    fn from(update: SerializableExperienceUpdate) -> Self {
168        Self {
169            importance: update.importance,
170            confidence: update.confidence,
171            domain: update.domain,
172            related_files: update.related_files,
173            archived: update.archived,
174        }
175    }
176}
177
178// ============================================================================
179// SyncPayload — Full data for each mutation type
180// ============================================================================
181
182/// The payload of a sync change, containing all data needed to apply
183/// the change on the receiving end.
184///
185/// Uses full payloads (not deltas) so the receiver has everything needed
186/// including embeddings for HNSW insertion.
187#[derive(Clone, Debug, Serialize, Deserialize)]
188pub enum SyncPayload {
189    /// A new experience was created.
190    ExperienceCreated(Experience),
191
192    /// An existing experience was updated.
193    ExperienceUpdated {
194        /// The experience that was updated.
195        id: ExperienceId,
196        /// The fields that changed.
197        update: SerializableExperienceUpdate,
198        /// When the update occurred.
199        timestamp: Timestamp,
200    },
201
202    /// An experience was soft-deleted (archived).
203    ExperienceArchived {
204        /// The archived experience.
205        id: ExperienceId,
206        /// When the archive occurred.
207        timestamp: Timestamp,
208    },
209
210    /// An experience was permanently deleted.
211    ExperienceDeleted {
212        /// The deleted experience.
213        id: ExperienceId,
214        /// When the deletion occurred.
215        timestamp: Timestamp,
216    },
217
218    /// A new relation was created.
219    RelationCreated(ExperienceRelation),
220
221    /// A relation was deleted.
222    RelationDeleted {
223        /// The deleted relation.
224        id: RelationId,
225        /// When the deletion occurred.
226        timestamp: Timestamp,
227    },
228
229    /// A new insight was created.
230    InsightCreated(DerivedInsight),
231
232    /// An insight was deleted.
233    InsightDeleted {
234        /// The deleted insight.
235        id: InsightId,
236        /// When the deletion occurred.
237        timestamp: Timestamp,
238    },
239
240    /// A new collective was created.
241    CollectiveCreated(Collective),
242}
243
244// ============================================================================
245// SyncChange — A single change to sync
246// ============================================================================
247
248/// A single change event to be synchronized between PulseDB instances.
249///
250/// Contains the full payload needed to apply the change, plus metadata
251/// about the source instance and WAL position.
252#[derive(Clone, Debug, Serialize, Deserialize)]
253pub struct SyncChange {
254    /// Source WAL sequence number.
255    pub sequence: u64,
256
257    /// The instance that produced this change.
258    pub source_instance: InstanceId,
259
260    /// Which collective this change belongs to.
261    pub collective_id: CollectiveId,
262
263    /// What kind of entity changed.
264    pub entity_type: SyncEntityType,
265
266    /// The full change data.
267    pub payload: SyncPayload,
268
269    /// When the change occurred.
270    pub timestamp: Timestamp,
271}
272
273// ============================================================================
274// SyncStatus — Current state of the sync system
275// ============================================================================
276
277/// Current operational status of the sync system.
278#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
279pub enum SyncStatus {
280    /// Sync is idle, waiting for the next poll interval.
281    Idle,
282    /// Sync is actively transferring data.
283    Syncing,
284    /// Sync encountered an error.
285    Error(String),
286    /// Disconnected from the remote peer.
287    Disconnected,
288}
289
290// ============================================================================
291// Handshake messages
292// ============================================================================
293
294/// Request sent during sync handshake to establish a connection.
295#[derive(Clone, Debug, Serialize, Deserialize)]
296pub struct HandshakeRequest {
297    /// The local instance ID.
298    pub instance_id: InstanceId,
299    /// The sync protocol version.
300    pub protocol_version: u32,
301    /// Capabilities advertised by this instance.
302    pub capabilities: Vec<String>,
303}
304
305/// Response to a handshake request.
306#[derive(Clone, Debug, Serialize, Deserialize)]
307pub struct HandshakeResponse {
308    /// The remote instance ID.
309    pub instance_id: InstanceId,
310    /// The remote's protocol version.
311    pub protocol_version: u32,
312    /// Whether the handshake was accepted.
313    pub accepted: bool,
314    /// Reason for rejection, if not accepted.
315    pub reason: Option<String>,
316}
317
318// ============================================================================
319// Pull request/response
320// ============================================================================
321
322/// Request to pull changes from a remote peer.
323#[derive(Clone, Debug, Serialize, Deserialize)]
324pub struct PullRequest {
325    /// The cursor position to pull changes from.
326    pub cursor: SyncCursor,
327    /// Maximum number of changes to return in this batch.
328    pub batch_size: usize,
329    /// Optional filter: only pull changes for these collectives.
330    pub collectives: Option<Vec<CollectiveId>>,
331}
332
333/// Response containing pulled changes.
334#[derive(Clone, Debug, Serialize, Deserialize)]
335pub struct PullResponse {
336    /// The changes since the cursor position.
337    pub changes: Vec<SyncChange>,
338    /// Whether there are more changes available.
339    pub has_more: bool,
340    /// The updated cursor position after this batch.
341    pub new_cursor: SyncCursor,
342}
343
344// ============================================================================
345// Push response
346// ============================================================================
347
348/// Response after pushing changes to a remote peer.
349#[derive(Clone, Debug, Serialize, Deserialize)]
350pub struct PushResponse {
351    /// Number of changes accepted by the remote.
352    pub accepted: usize,
353    /// Number of changes rejected by the remote.
354    pub rejected: usize,
355    /// The remote's updated cursor position.
356    pub new_cursor: SyncCursor,
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_instance_id_new_is_unique() {
365        let a = InstanceId::new();
366        let b = InstanceId::new();
367        assert_ne!(a, b);
368    }
369
370    #[test]
371    fn test_instance_id_nil() {
372        let id = InstanceId::nil();
373        assert_eq!(id, InstanceId::default());
374        assert_eq!(id.0, Uuid::nil());
375    }
376
377    #[test]
378    fn test_instance_id_bytes_roundtrip() {
379        let id = InstanceId::new();
380        let bytes = *id.as_bytes();
381        let restored = InstanceId::from_bytes(bytes);
382        assert_eq!(id, restored);
383    }
384
385    #[test]
386    fn test_instance_id_display() {
387        let id = InstanceId::nil();
388        assert_eq!(id.to_string(), "00000000-0000-0000-0000-000000000000");
389    }
390
391    #[test]
392    fn test_instance_id_bincode_roundtrip() {
393        let id = InstanceId::new();
394        let bytes = bincode::serialize(&id).unwrap();
395        let restored: InstanceId = bincode::deserialize(&bytes).unwrap();
396        assert_eq!(id, restored);
397    }
398
399    #[test]
400    fn test_sync_cursor_new() {
401        let id = InstanceId::new();
402        let cursor = SyncCursor::new(id);
403        assert_eq!(cursor.instance_id, id);
404        assert_eq!(cursor.last_sequence, 0);
405    }
406
407    #[test]
408    fn test_sync_cursor_bincode_roundtrip() {
409        let cursor = SyncCursor {
410            instance_id: InstanceId::new(),
411            last_sequence: 42,
412        };
413        let bytes = bincode::serialize(&cursor).unwrap();
414        let restored: SyncCursor = bincode::deserialize(&bytes).unwrap();
415        assert_eq!(cursor, restored);
416    }
417
418    #[test]
419    fn test_sync_entity_type_repr() {
420        assert_eq!(SyncEntityType::Experience as u8, 0);
421        assert_eq!(SyncEntityType::Relation as u8, 1);
422        assert_eq!(SyncEntityType::Insight as u8, 2);
423        assert_eq!(SyncEntityType::Collective as u8, 3);
424    }
425
426    #[test]
427    fn test_serializable_experience_update_from_conversion() {
428        let update = crate::experience::ExperienceUpdate {
429            importance: Some(0.9),
430            confidence: None,
431            domain: Some(vec!["rust".to_string()]),
432            related_files: None,
433            archived: Some(false),
434        };
435        let serializable: SerializableExperienceUpdate = update.into();
436        assert_eq!(serializable.importance, Some(0.9));
437        assert_eq!(serializable.confidence, None);
438        assert_eq!(serializable.domain, Some(vec!["rust".to_string()]));
439        assert_eq!(serializable.archived, Some(false));
440    }
441
442    #[test]
443    fn test_serializable_experience_update_into_conversion() {
444        let serializable = SerializableExperienceUpdate {
445            importance: Some(0.5),
446            confidence: Some(0.8),
447            domain: None,
448            related_files: Some(vec!["main.rs".to_string()]),
449            archived: None,
450        };
451        let update: crate::experience::ExperienceUpdate = serializable.into();
452        assert_eq!(update.importance, Some(0.5));
453        assert_eq!(update.confidence, Some(0.8));
454        assert_eq!(update.related_files, Some(vec!["main.rs".to_string()]));
455    }
456
457    #[test]
458    fn test_serializable_experience_update_bincode_roundtrip() {
459        let update = SerializableExperienceUpdate {
460            importance: Some(0.7),
461            confidence: Some(0.9),
462            domain: Some(vec!["test".to_string()]),
463            related_files: None,
464            archived: Some(true),
465        };
466        let bytes = bincode::serialize(&update).unwrap();
467        let restored: SerializableExperienceUpdate = bincode::deserialize(&bytes).unwrap();
468        assert_eq!(update.importance, restored.importance);
469        assert_eq!(update.confidence, restored.confidence);
470        assert_eq!(update.domain, restored.domain);
471        assert_eq!(update.archived, restored.archived);
472    }
473
474    #[test]
475    fn test_sync_status_equality() {
476        assert_eq!(SyncStatus::Idle, SyncStatus::Idle);
477        assert_eq!(SyncStatus::Error("x".into()), SyncStatus::Error("x".into()));
478        assert_ne!(SyncStatus::Idle, SyncStatus::Syncing);
479    }
480
481    #[test]
482    fn test_handshake_request_bincode_roundtrip() {
483        let req = HandshakeRequest {
484            instance_id: InstanceId::new(),
485            protocol_version: 1,
486            capabilities: vec!["push".to_string(), "pull".to_string()],
487        };
488        let bytes = bincode::serialize(&req).unwrap();
489        let restored: HandshakeRequest = bincode::deserialize(&bytes).unwrap();
490        assert_eq!(req.instance_id, restored.instance_id);
491        assert_eq!(req.protocol_version, restored.protocol_version);
492        assert_eq!(req.capabilities, restored.capabilities);
493    }
494
495    #[test]
496    fn test_pull_request_bincode_roundtrip() {
497        let req = PullRequest {
498            cursor: SyncCursor::new(InstanceId::new()),
499            batch_size: 500,
500            collectives: Some(vec![CollectiveId::new()]),
501        };
502        let bytes = bincode::serialize(&req).unwrap();
503        let restored: PullRequest = bincode::deserialize(&bytes).unwrap();
504        assert_eq!(req.cursor, restored.cursor);
505        assert_eq!(req.batch_size, restored.batch_size);
506    }
507
508    #[test]
509    fn test_push_response_bincode_roundtrip() {
510        let resp = PushResponse {
511            accepted: 10,
512            rejected: 2,
513            new_cursor: SyncCursor {
514                instance_id: InstanceId::new(),
515                last_sequence: 100,
516            },
517        };
518        let bytes = bincode::serialize(&resp).unwrap();
519        let restored: PushResponse = bincode::deserialize(&bytes).unwrap();
520        assert_eq!(resp.accepted, restored.accepted);
521        assert_eq!(resp.rejected, restored.rejected);
522        assert_eq!(resp.new_cursor, restored.new_cursor);
523    }
524}