Skip to main content

force_sync/
model.rs

1//! Change envelope and cursor types for sync events.
2
3use chrono::{DateTime, Utc};
4use serde_json::Value;
5
6use crate::identity::SyncKey;
7
8/// Origin system for a captured change.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SourceSystem {
11    /// Change originated from Salesforce.
12    Salesforce,
13    /// Change originated from Postgres.
14    Postgres,
15}
16
17impl SourceSystem {
18    /// Returns the database representation used by the sync journal.
19    #[must_use]
20    pub const fn as_db_value(self) -> &'static str {
21        match self {
22            Self::Salesforce => "salesforce",
23            Self::Postgres => "postgres",
24        }
25    }
26}
27
28/// Mutation type carried by a change envelope.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ChangeOperation {
31    /// Create or update semantics.
32    Upsert,
33    /// Delete semantics.
34    Delete,
35}
36
37impl ChangeOperation {
38    /// Returns the database representation used by the sync journal.
39    #[must_use]
40    pub const fn as_db_value(self) -> &'static str {
41        match self {
42            Self::Upsert => "upsert",
43            Self::Delete => "delete",
44        }
45    }
46}
47
48/// Source-specific cursor for replayable capture streams.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum SourceCursor {
51    /// Salesforce CDC replay identifier.
52    SalesforceReplayId(i64),
53    /// Postgres logical sequence number or outbox position.
54    PostgresLsn(String),
55    /// Snapshot or backfill watermark.
56    Snapshot(String),
57}
58
59impl SourceCursor {
60    /// Returns the database representation used by the sync journal.
61    #[must_use]
62    pub fn as_db_value(&self) -> String {
63        use std::fmt::Write;
64        match self {
65            Self::SalesforceReplayId(replay_id) => {
66                let mut s = String::with_capacity(22 + 20); // prefix + max digits for i64
67                let _ = write!(s, "salesforce-replay-id:{replay_id}");
68                s
69            }
70            Self::PostgresLsn(lsn) => {
71                let mut s = String::with_capacity(13 + lsn.len());
72                s.push_str("postgres-lsn:");
73                s.push_str(lsn);
74                s
75            }
76            Self::Snapshot(watermark) => {
77                let mut s = String::with_capacity(9 + watermark.len());
78                s.push_str("snapshot:");
79                s.push_str(watermark);
80                s
81            }
82        }
83    }
84}
85
86/// Canonical sync event stored in the journal and passed to planners.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ChangeEnvelope {
89    sync_key: SyncKey,
90    source: SourceSystem,
91    operation: ChangeOperation,
92    cursor: Option<SourceCursor>,
93    observed_at: DateTime<Utc>,
94    payload: Value,
95}
96
97impl ChangeEnvelope {
98    /// Creates a new change envelope with no replay cursor.
99    #[must_use]
100    pub const fn new(
101        sync_key: SyncKey,
102        source: SourceSystem,
103        operation: ChangeOperation,
104        observed_at: DateTime<Utc>,
105        payload: Value,
106    ) -> Self {
107        Self {
108            sync_key,
109            source,
110            operation,
111            cursor: None,
112            observed_at,
113            payload,
114        }
115    }
116
117    /// Attaches a replay cursor to the envelope.
118    #[must_use]
119    pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
120        self.cursor = Some(cursor);
121        self
122    }
123
124    /// Returns the canonical record identity.
125    #[must_use]
126    pub const fn sync_key(&self) -> &SyncKey {
127        &self.sync_key
128    }
129
130    /// Returns the source system for the change.
131    #[must_use]
132    pub const fn source(&self) -> SourceSystem {
133        self.source
134    }
135
136    /// Returns the operation carried by the change.
137    #[must_use]
138    pub const fn operation(&self) -> ChangeOperation {
139        self.operation
140    }
141
142    /// Returns the source cursor, if available.
143    #[must_use]
144    pub const fn cursor(&self) -> Option<&SourceCursor> {
145        self.cursor.as_ref()
146    }
147
148    /// Returns when the change was observed by the sync engine.
149    #[must_use]
150    pub const fn observed_at(&self) -> DateTime<Utc> {
151        self.observed_at
152    }
153
154    /// Returns the raw change payload.
155    #[must_use]
156    pub const fn payload(&self) -> &Value {
157        &self.payload
158    }
159
160    /// Returns a stable hash for the payload contents.
161    #[must_use]
162    pub fn payload_hash(&self) -> [u8; 32] {
163        payload_hash(&self.payload)
164    }
165
166    /// Returns `true` when the provided payload hashes to the same value.
167    #[must_use]
168    pub fn payload_hash_matches(&self, other: &Value) -> bool {
169        self.payload_hash() == payload_hash(other)
170    }
171}
172
173/// Returns a stable BLAKE3 hash for a JSON payload.
174#[must_use]
175pub fn payload_hash(payload: &Value) -> [u8; 32] {
176    let mut hasher = blake3::Hasher::new();
177    // ⚡ Bolt: Write JSON directly into the hasher without an intermediate String allocation.
178    // We avoid `payload.clone()` by manually sorting object keys dynamically during hashing.
179    hash_json_value(payload, &mut hasher);
180    *hasher.finalize().as_bytes()
181}
182
183fn hash_json_value(value: &Value, hasher: &mut blake3::Hasher) {
184    use std::io::Write;
185    match value {
186        Value::Object(map) => {
187            let _ = Write::write_all(hasher, b"{");
188            // ⚡ Bolt: Provide a capacity hint for the intermediate vector to avoid reallocations
189            let mut iter = Vec::with_capacity(map.len());
190            iter.extend(map.iter());
191            iter.sort_unstable_by_key(|(k, _)| *k);
192            let mut first = true;
193            for (k, v) in iter {
194                if !first {
195                    let _ = Write::write_all(hasher, b",");
196                }
197                first = false;
198                let _ = serde_json::to_writer(&mut *hasher, k);
199                let _ = Write::write_all(hasher, b":");
200                hash_json_value(v, hasher);
201            }
202            let _ = Write::write_all(hasher, b"}");
203        }
204        Value::Array(arr) => {
205            let _ = Write::write_all(hasher, b"[");
206            let mut first = true;
207            for v in arr {
208                if !first {
209                    let _ = Write::write_all(hasher, b",");
210                }
211                first = false;
212                hash_json_value(v, hasher);
213            }
214            let _ = Write::write_all(hasher, b"]");
215        }
216        _ => {
217            let _ = serde_json::to_writer(hasher, value);
218        }
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use chrono::Utc;
225    use serde_json::json;
226
227    use crate::identity::SyncKey;
228
229    use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
230
231    #[test]
232    fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
233        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
234            Ok(sync_key) => sync_key,
235            Err(error) => panic!("unexpected sync key construction error: {error}"),
236        };
237        let payload = json!({"Name": "Acme"});
238
239        let first = ChangeEnvelope::new(
240            sync_key.clone(),
241            SourceSystem::Salesforce,
242            ChangeOperation::Upsert,
243            Utc::now(),
244            payload.clone(),
245        );
246        let second = ChangeEnvelope::new(
247            sync_key,
248            SourceSystem::Salesforce,
249            ChangeOperation::Upsert,
250            Utc::now(),
251            payload,
252        );
253
254        assert_eq!(first.payload_hash(), second.payload_hash());
255    }
256
257    #[test]
258    fn payload_hash_matches_pre_sorted_payloads() {
259        let payload = json!({
260            "outer": {
261                "zeta": 1,
262                "alpha": 2
263            },
264            "items": [
265                {
266                    "delta": 4,
267                    "beta": 3
268                }
269            ]
270        });
271        let mut sorted_payload = payload.clone();
272        sorted_payload.sort_all_objects();
273
274        assert_eq!(
275            super::payload_hash(&payload),
276            super::payload_hash(&sorted_payload)
277        );
278    }
279
280    #[test]
281    fn change_envelope_with_cursor_attaches_cursor() {
282        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
283            Ok(sync_key) => sync_key,
284            Err(error) => panic!("unexpected sync key construction error: {error}"),
285        };
286
287        let envelope = ChangeEnvelope::new(
288            sync_key,
289            SourceSystem::Salesforce,
290            ChangeOperation::Upsert,
291            Utc::now(),
292            json!({"Name": "Acme"}),
293        )
294        .with_cursor(SourceCursor::SalesforceReplayId(42));
295
296        assert!(matches!(
297            envelope.cursor(),
298            Some(SourceCursor::SalesforceReplayId(42))
299        ));
300    }
301
302    #[test]
303    fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
304        let sync_key = match SyncKey::new("tenant", "Account", "abc") {
305            Ok(sync_key) => sync_key,
306            Err(error) => panic!("unexpected sync key construction error: {error}"),
307        };
308
309        let envelope = ChangeEnvelope::new(
310            sync_key,
311            SourceSystem::Salesforce,
312            ChangeOperation::Upsert,
313            Utc::now(),
314            json!({
315                "Name": "Acme",
316                "Description": "Keep"
317            }),
318        );
319
320        assert!(envelope.payload_hash_matches(&json!({
321            "Description": "Keep",
322            "Name": "Acme"
323        })));
324    }
325}