Skip to main content

force_sync/
model.rs

1//! Change envelope and cursor types for sync events.
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5
6use crate::identity::SyncKey;
7
8/// Origin system for a captured change.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SourceSystem {
11    /// Change originated from Salesforce.
12    Salesforce,
13    /// Change originated from Postgres.
14    Postgres,
15}
16
17impl SourceSystem {
18    /// Returns the database representation used by the sync journal.
19    #[must_use]
20    pub const fn as_db_value(self) -> &'static str {
21        match self {
22            Self::Salesforce => "salesforce",
23            Self::Postgres => "postgres",
24        }
25    }
26}
27
28/// Mutation type carried by a change envelope.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ChangeOperation {
31    /// Create or update semantics.
32    Upsert,
33    /// Delete semantics.
34    Delete,
35}
36
37impl ChangeOperation {
38    /// Returns the database representation used by the sync journal.
39    #[must_use]
40    pub const fn as_db_value(self) -> &'static str {
41        match self {
42            Self::Upsert => "upsert",
43            Self::Delete => "delete",
44        }
45    }
46}
47
48/// Source-specific cursor for replayable capture streams.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum SourceCursor {
51    /// Salesforce CDC replay identifier.
52    SalesforceReplayId(i64),
53    /// Postgres logical sequence number or outbox position.
54    PostgresLsn(String),
55    /// Snapshot or backfill watermark.
56    Snapshot(String),
57}
58
59impl SourceCursor {
60    /// Returns the database representation used by the sync journal.
61    #[must_use]
62    pub fn as_db_value(&self) -> String {
63        match self {
64            Self::SalesforceReplayId(replay_id) => format!("salesforce-replay-id:{replay_id}"),
65            Self::PostgresLsn(lsn) => format!("postgres-lsn:{lsn}"),
66            Self::Snapshot(watermark) => format!("snapshot:{watermark}"),
67        }
68    }
69}
70
71/// Canonical sync event stored in the journal and passed to planners.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct ChangeEnvelope {
74    sync_key: SyncKey,
75    source: SourceSystem,
76    operation: ChangeOperation,
77    cursor: Option<SourceCursor>,
78    observed_at: DateTime<Utc>,
79    payload: Value,
80}
81
82impl ChangeEnvelope {
83    /// Creates a new change envelope with no replay cursor.
84    #[must_use]
85    pub const fn new(
86        sync_key: SyncKey,
87        source: SourceSystem,
88        operation: ChangeOperation,
89        observed_at: DateTime<Utc>,
90        payload: Value,
91    ) -> Self {
92        Self {
93            sync_key,
94            source,
95            operation,
96            cursor: None,
97            observed_at,
98            payload,
99        }
100    }
101
102    /// Attaches a replay cursor to the envelope.
103    #[must_use]
104    pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
105        self.cursor = Some(cursor);
106        self
107    }
108
109    /// Returns the canonical record identity.
110    #[must_use]
111    pub const fn sync_key(&self) -> &SyncKey {
112        &self.sync_key
113    }
114
115    /// Returns the source system for the change.
116    #[must_use]
117    pub const fn source(&self) -> SourceSystem {
118        self.source
119    }
120
121    /// Returns the operation carried by the change.
122    #[must_use]
123    pub const fn operation(&self) -> ChangeOperation {
124        self.operation
125    }
126
127    /// Returns the source cursor, if available.
128    #[must_use]
129    pub const fn cursor(&self) -> Option<&SourceCursor> {
130        self.cursor.as_ref()
131    }
132
133    /// Returns when the change was observed by the sync engine.
134    #[must_use]
135    pub const fn observed_at(&self) -> DateTime<Utc> {
136        self.observed_at
137    }
138
139    /// Returns the raw change payload.
140    #[must_use]
141    pub const fn payload(&self) -> &Value {
142        &self.payload
143    }
144
145    /// Returns a stable hash for the payload contents.
146    #[must_use]
147    pub fn payload_hash(&self) -> [u8; 32] {
148        payload_hash(&self.payload)
149    }
150
151    /// Returns `true` when the provided payload hashes to the same value.
152    #[must_use]
153    pub fn payload_hash_matches(&self, other: &Value) -> bool {
154        self.payload_hash() == payload_hash(other)
155    }
156}
157
158/// Returns a stable BLAKE3 hash for a JSON payload.
159#[must_use]
160pub fn payload_hash(payload: &Value) -> [u8; 32] {
161    let mut canonical_payload = payload.clone();
162    canonical_payload.sort_all_objects();
163
164    *blake3::hash(canonical_payload.to_string().as_bytes()).as_bytes()
165}
166
167#[cfg(test)]
168mod tests {
169    use chrono::Utc;
170    use serde_json::json;
171
172    use crate::identity::SyncKey;
173
174    use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
175
176    #[test]
177    fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
178        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
179            Ok(sync_key) => sync_key,
180            Err(error) => panic!("unexpected sync key construction error: {error}"),
181        };
182        let payload = json!({"Name": "Acme"});
183
184        let first = ChangeEnvelope::new(
185            sync_key.clone(),
186            SourceSystem::Salesforce,
187            ChangeOperation::Upsert,
188            Utc::now(),
189            payload.clone(),
190        );
191        let second = ChangeEnvelope::new(
192            sync_key,
193            SourceSystem::Salesforce,
194            ChangeOperation::Upsert,
195            Utc::now(),
196            payload,
197        );
198
199        assert_eq!(first.payload_hash(), second.payload_hash());
200    }
201
202    #[test]
203    fn payload_hash_matches_pre_sorted_payloads() {
204        let payload = json!({
205            "outer": {
206                "zeta": 1,
207                "alpha": 2
208            },
209            "items": [
210                {
211                    "delta": 4,
212                    "beta": 3
213                }
214            ]
215        });
216        let mut sorted_payload = payload.clone();
217        sorted_payload.sort_all_objects();
218
219        assert_eq!(
220            super::payload_hash(&payload),
221            super::payload_hash(&sorted_payload)
222        );
223    }
224
225    #[test]
226    fn change_envelope_with_cursor_attaches_cursor() {
227        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
228            Ok(sync_key) => sync_key,
229            Err(error) => panic!("unexpected sync key construction error: {error}"),
230        };
231
232        let envelope = ChangeEnvelope::new(
233            sync_key,
234            SourceSystem::Salesforce,
235            ChangeOperation::Upsert,
236            Utc::now(),
237            json!({"Name": "Acme"}),
238        )
239        .with_cursor(SourceCursor::SalesforceReplayId(42));
240
241        assert!(matches!(
242            envelope.cursor(),
243            Some(SourceCursor::SalesforceReplayId(42))
244        ));
245    }
246
247    #[test]
248    fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
249        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
250            Ok(sync_key) => sync_key,
251            Err(error) => panic!("unexpected sync key construction error: {error}"),
252        };
253
254        let envelope = ChangeEnvelope::new(
255            sync_key,
256            SourceSystem::Salesforce,
257            ChangeOperation::Upsert,
258            Utc::now(),
259            json!({
260                "Name": "Acme",
261                "Description": "Keep"
262            }),
263        );
264
265        assert!(envelope.payload_hash_matches(&json!({
266            "Description": "Keep",
267            "Name": "Acme"
268        })));
269    }
270}