1use chrono::{DateTime, Utc};
4use serde_json::Value;
5
6use crate::identity::SyncKey;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum SourceSystem {
11 Salesforce,
13 Postgres,
15}
16
17impl SourceSystem {
18 #[must_use]
20 pub const fn as_db_value(self) -> &'static str {
21 match self {
22 Self::Salesforce => "salesforce",
23 Self::Postgres => "postgres",
24 }
25 }
26}
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum ChangeOperation {
31 Upsert,
33 Delete,
35}
36
37impl ChangeOperation {
38 #[must_use]
40 pub const fn as_db_value(self) -> &'static str {
41 match self {
42 Self::Upsert => "upsert",
43 Self::Delete => "delete",
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum SourceCursor {
51 SalesforceReplayId(i64),
53 PostgresLsn(String),
55 Snapshot(String),
57}
58
59impl SourceCursor {
60 #[must_use]
62 pub fn as_db_value(&self) -> String {
63 match self {
64 Self::SalesforceReplayId(replay_id) => format!("salesforce-replay-id:{replay_id}"),
65 Self::PostgresLsn(lsn) => format!("postgres-lsn:{lsn}"),
66 Self::Snapshot(watermark) => format!("snapshot:{watermark}"),
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct ChangeEnvelope {
74 sync_key: SyncKey,
75 source: SourceSystem,
76 operation: ChangeOperation,
77 cursor: Option<SourceCursor>,
78 observed_at: DateTime<Utc>,
79 payload: Value,
80}
81
82impl ChangeEnvelope {
83 #[must_use]
85 pub const fn new(
86 sync_key: SyncKey,
87 source: SourceSystem,
88 operation: ChangeOperation,
89 observed_at: DateTime<Utc>,
90 payload: Value,
91 ) -> Self {
92 Self {
93 sync_key,
94 source,
95 operation,
96 cursor: None,
97 observed_at,
98 payload,
99 }
100 }
101
102 #[must_use]
104 pub fn with_cursor(mut self, cursor: SourceCursor) -> Self {
105 self.cursor = Some(cursor);
106 self
107 }
108
109 #[must_use]
111 pub const fn sync_key(&self) -> &SyncKey {
112 &self.sync_key
113 }
114
115 #[must_use]
117 pub const fn source(&self) -> SourceSystem {
118 self.source
119 }
120
121 #[must_use]
123 pub const fn operation(&self) -> ChangeOperation {
124 self.operation
125 }
126
127 #[must_use]
129 pub const fn cursor(&self) -> Option<&SourceCursor> {
130 self.cursor.as_ref()
131 }
132
133 #[must_use]
135 pub const fn observed_at(&self) -> DateTime<Utc> {
136 self.observed_at
137 }
138
139 #[must_use]
141 pub const fn payload(&self) -> &Value {
142 &self.payload
143 }
144
145 #[must_use]
147 pub fn payload_hash(&self) -> [u8; 32] {
148 payload_hash(&self.payload)
149 }
150
151 #[must_use]
153 pub fn payload_hash_matches(&self, other: &Value) -> bool {
154 self.payload_hash() == payload_hash(other)
155 }
156}
157
158#[must_use]
160pub fn payload_hash(payload: &Value) -> [u8; 32] {
161 let mut canonical_payload = payload.clone();
162 canonical_payload.sort_all_objects();
163
164 *blake3::hash(canonical_payload.to_string().as_bytes()).as_bytes()
165}
166
167#[cfg(test)]
168mod tests {
169 use chrono::Utc;
170 use serde_json::json;
171
172 use crate::identity::SyncKey;
173
174 use super::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem};
175
176 #[test]
177 fn change_envelope_payload_hash_is_stable_for_identical_payloads() {
178 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
179 Ok(sync_key) => sync_key,
180 Err(error) => panic!("unexpected sync key construction error: {error}"),
181 };
182 let payload = json!({"Name": "Acme"});
183
184 let first = ChangeEnvelope::new(
185 sync_key.clone(),
186 SourceSystem::Salesforce,
187 ChangeOperation::Upsert,
188 Utc::now(),
189 payload.clone(),
190 );
191 let second = ChangeEnvelope::new(
192 sync_key,
193 SourceSystem::Salesforce,
194 ChangeOperation::Upsert,
195 Utc::now(),
196 payload,
197 );
198
199 assert_eq!(first.payload_hash(), second.payload_hash());
200 }
201
202 #[test]
203 fn payload_hash_matches_pre_sorted_payloads() {
204 let payload = json!({
205 "outer": {
206 "zeta": 1,
207 "alpha": 2
208 },
209 "items": [
210 {
211 "delta": 4,
212 "beta": 3
213 }
214 ]
215 });
216 let mut sorted_payload = payload.clone();
217 sorted_payload.sort_all_objects();
218
219 assert_eq!(
220 super::payload_hash(&payload),
221 super::payload_hash(&sorted_payload)
222 );
223 }
224
225 #[test]
226 fn change_envelope_with_cursor_attaches_cursor() {
227 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
228 Ok(sync_key) => sync_key,
229 Err(error) => panic!("unexpected sync key construction error: {error}"),
230 };
231
232 let envelope = ChangeEnvelope::new(
233 sync_key,
234 SourceSystem::Salesforce,
235 ChangeOperation::Upsert,
236 Utc::now(),
237 json!({"Name": "Acme"}),
238 )
239 .with_cursor(SourceCursor::SalesforceReplayId(42));
240
241 assert!(matches!(
242 envelope.cursor(),
243 Some(SourceCursor::SalesforceReplayId(42))
244 ));
245 }
246
247 #[test]
248 fn change_envelope_payload_hash_matches_semantically_equal_payloads() {
249 let sync_key = match SyncKey::new("tenant", "Account", "abc") {
250 Ok(sync_key) => sync_key,
251 Err(error) => panic!("unexpected sync key construction error: {error}"),
252 };
253
254 let envelope = ChangeEnvelope::new(
255 sync_key,
256 SourceSystem::Salesforce,
257 ChangeOperation::Upsert,
258 Utc::now(),
259 json!({
260 "Name": "Acme",
261 "Description": "Keep"
262 }),
263 );
264
265 assert!(envelope.payload_hash_matches(&json!({
266 "Description": "Keep",
267 "Name": "Acme"
268 })));
269 }
270}