1use chrono::{DateTime, Utc};
4use serde_json::Value;
5
6use crate::identity::SyncKey;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SourceSystem {
11 Salesforce,
13 Postgres,
15}
16
17impl SourceSystem {
18 #[must_use]
20 pub const fn as_db_value(self) -> &'static str {
21 match self {
22 Self::Salesforce => "salesforce",
23 Self::Postgres => "postgres",
24 }
25 }
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ChangeOperation {
31 Upsert,
33 Delete,
35}
36
37impl ChangeOperation {
38 #[must_use]
40 pub const fn as_db_value(self) -> &'static str {
41 match self {
42 Self::Upsert => "upsert",
43 Self::Delete => "delete",
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum SourceCursor {
51 SalesforceReplayId(i64),
53 PostgresLsn(String),
55 Snapshot(String),
57}
58
59impl SourceCursor {
60 #[must_use]
62 pub fn as_db_value(&self) -> String {
63 use std::fmt::Write;
64 match self {
65 Self::SalesforceReplayId(replay_id) => {
66 let mut s = String::with_capacity(22 + 20); let _ = write!(s, "salesforce-replay-id:{replay_id}");
68 s
69 }
70 Self::PostgresLsn(lsn) => {
71 let mut s = String::with_capacity(13 + lsn.len());
72 s.push_str("postgres-lsn:");
73 s.push_str(lsn);
74 s
75 }
76 Self::Snapshot(watermark) => {
77 let mut s = String::with_capacity(9 + watermark.len());
78 s.push_str("snapshot:");
79 s.push_str(watermark);
80 s
81 }
82 }
83 }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct ChangeEnvelope {
89 sync_key: SyncKey,
90 source: SourceSystem,
91 operation: ChangeOperation,
92 cursor: Option<SourceCursor>,
93 observed_at: DateTime<Utc>,
94 payload: Value,
95}
96
97impl ChangeEnvelope {
98 #[must_use]
100 pub const fn new(
101 sync_key: SyncKey,
102 source: SourceSystem,
103 operation: ChangeOperation,
104 observed_at: DateTime<Utc>,
105 payload: Value,
106 ) -> Self {
107 Self {
108 sync_key,
109 source,
110 operation,
111 cursor: None,
112 observed_at,
113 payload,
114 }
115 }
116
117 #[must_use]
119 pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
120 self.cursor = Some(cursor);
121 self
122 }
123
124 #[must_use]
126 pub const fn sync_key(&self) -> &SyncKey {
127 &self.sync_key
128 }
129
130 #[must_use]
132 pub const fn source(&self) -> SourceSystem {
133 self.source
134 }
135
136 #[must_use]
138 pub const fn operation(&self) -> ChangeOperation {
139 self.operation
140 }
141
142 #[must_use]
144 pub const fn cursor(&self) -> Option<&SourceCursor> {
145 self.cursor.as_ref()
146 }
147
148 #[must_use]
150 pub const fn observed_at(&self) -> DateTime<Utc> {
151 self.observed_at
152 }
153
154 #[must_use]
156 pub const fn payload(&self) -> &Value {
157 &self.payload
158 }
159
160 #[must_use]
162 pub fn payload_hash(&self) -> [u8; 32] {
163 payload_hash(&self.payload)
164 }
165
166 #[must_use]
168 pub fn payload_hash_matches(&self, other: &Value) -> bool {
169 self.payload_hash() == payload_hash(other)
170 }
171}
172
173#[must_use]
175pub fn payload_hash(payload: &Value) -> [u8; 32] {
176 let mut hasher = blake3::Hasher::new();
177 hash_json_value(payload, &mut hasher);
180 *hasher.finalize().as_bytes()
181}
182
183fn hash_json_value(value: &Value, hasher: &mut blake3::Hasher) {
184 use std::io::Write;
185 match value {
186 Value::Object(map) => {
187 let _ = Write::write_all(hasher, b"{");
188 let mut iter = Vec::with_capacity(map.len());
190 iter.extend(map.iter());
191 iter.sort_unstable_by_key(|(k, _)| *k);
192 let mut first = true;
193 for (k, v) in iter {
194 if !first {
195 let _ = Write::write_all(hasher, b",");
196 }
197 first = false;
198 let _ = serde_json::to_writer(&mut *hasher, k);
199 let _ = Write::write_all(hasher, b":");
200 hash_json_value(v, hasher);
201 }
202 let _ = Write::write_all(hasher, b"}");
203 }
204 Value::Array(arr) => {
205 let _ = Write::write_all(hasher, b"[");
206 let mut first = true;
207 for v in arr {
208 if !first {
209 let _ = Write::write_all(hasher, b",");
210 }
211 first = false;
212 hash_json_value(v, hasher);
213 }
214 let _ = Write::write_all(hasher, b"]");
215 }
216 _ => {
217 let _ = serde_json::to_writer(hasher, value);
218 }
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use chrono::Utc;
225 use serde_json::json;
226
227 use crate::identity::SyncKey;
228
229 use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
230
231 #[test]
232 fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
233 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
234 Ok(sync_key) => sync_key,
235 Err(error) => panic!("unexpected sync key construction error: {error}"),
236 };
237 let payload = json!({"Name": "Acme"});
238
239 let first = ChangeEnvelope::new(
240 sync_key.clone(),
241 SourceSystem::Salesforce,
242 ChangeOperation::Upsert,
243 Utc::now(),
244 payload.clone(),
245 );
246 let second = ChangeEnvelope::new(
247 sync_key,
248 SourceSystem::Salesforce,
249 ChangeOperation::Upsert,
250 Utc::now(),
251 payload,
252 );
253
254 assert_eq!(first.payload_hash(), second.payload_hash());
255 }
256
257 #[test]
258 fn payload_hash_matches_pre_sorted_payloads() {
259 let payload = json!({
260 "outer": {
261 "zeta": 1,
262 "alpha": 2
263 },
264 "items": [
265 {
266 "delta": 4,
267 "beta": 3
268 }
269 ]
270 });
271 let mut sorted_payload = payload.clone();
272 sorted_payload.sort_all_objects();
273
274 assert_eq!(
275 super::payload_hash(&payload),
276 super::payload_hash(&sorted_payload)
277 );
278 }
279
280 #[test]
281 fn change_envelope_with_cursor_attaches_cursor() {
282 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
283 Ok(sync_key) => sync_key,
284 Err(error) => panic!("unexpected sync key construction error: {error}"),
285 };
286
287 let envelope = ChangeEnvelope::new(
288 sync_key,
289 SourceSystem::Salesforce,
290 ChangeOperation::Upsert,
291 Utc::now(),
292 json!({"Name": "Acme"}),
293 )
294 .with_cursor(SourceCursor::SalesforceReplayId(42));
295
296 assert!(matches!(
297 envelope.cursor(),
298 Some(SourceCursor::SalesforceReplayId(42))
299 ));
300 }
301
302 #[test]
303 fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
304 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
305 Ok(sync_key) => sync_key,
306 Err(error) => panic!("unexpected sync key construction error: {error}"),
307 };
308
309 let envelope = ChangeEnvelope::new(
310 sync_key,
311 SourceSystem::Salesforce,
312 ChangeOperation::Upsert,
313 Utc::now(),
314 json!({
315 "Name": "Acme",
316 "Description": "Keep"
317 }),
318 );
319
320 assert!(envelope.payload_hash_matches(&json!({
321 "Description": "Keep",
322 "Name": "Acme"
323 })));
324 }
325}