Skip to main content

force_sync/capture/
salesforce.rs

1//! Stream-driven Salesforce CDC capture.
2
3use force_pubsub::{PubSubEvent, ReplayId};
4use futures::{Stream, StreamExt};
5use serde_json::Value;
6use tokio_postgres::GenericClient;
7
8use crate::{
9    config::ObjectSync,
10    error::ForceSyncError,
11    identity::SyncKey,
12    model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
13    store::pg::{AppendResult, PgStore},
14};
15
16fn change_operation(payload: &Value) -> ChangeOperation {
17    match payload
18        .get("ChangeEventHeader")
19        .and_then(|header| header.get("changeType"))
20        .and_then(Value::as_str)
21    {
22        Some("DELETE") => ChangeOperation::Delete,
23        _ => ChangeOperation::Upsert,
24    }
25}
26
27fn replay_id_to_position(replay_id: &ReplayId) -> Result<i64, ForceSyncError> {
28    if replay_id.as_bytes().len() > 8 {
29        return Err(ForceSyncError::InvalidStoredValue {
30            field: "replay_id",
31            value: format!("{} bytes", replay_id.as_bytes().len()),
32        });
33    }
34
35    let mut padded = [0u8; 8];
36    let start = 8 - replay_id.as_bytes().len();
37    padded[start..].copy_from_slice(replay_id.as_bytes());
38
39    let unsigned = u64::from_be_bytes(padded);
40    i64::try_from(unsigned).map_err(|_| ForceSyncError::InvalidStoredValue {
41        field: "replay_id",
42        value: unsigned.to_string(),
43    })
44}
45
46fn replay_id_from_position(position: i64) -> Result<ReplayId, ForceSyncError> {
47    let unsigned = u64::try_from(position).map_err(|_| ForceSyncError::InvalidStoredValue {
48        field: "cursor_position",
49        value: position.to_string(),
50    })?;
51    let bytes = unsigned.to_be_bytes();
52    let first_non_zero = bytes.iter().position(|byte| *byte != 0).unwrap_or(7);
53    Ok(ReplayId::from_bytes(bytes[first_non_zero..].to_vec()))
54}
55
56fn build_envelope(
57    tenant: &str,
58    object: &ObjectSync,
59    payload: Value,
60    replay_position: i64,
61) -> Result<ChangeEnvelope, ForceSyncError> {
62    let external_id_field =
63        object
64            .external_id_field()
65            .ok_or(ForceSyncError::MissingConfiguration {
66                field: "external_id_field",
67            })?;
68    let external_id = payload
69        .get(external_id_field)
70        .and_then(Value::as_str)
71        .ok_or_else(|| ForceSyncError::InvalidStoredValue {
72            field: "external_id",
73            value: payload.to_string(),
74        })?;
75
76    Ok(ChangeEnvelope::new(
77        SyncKey::new(
78            tenant.to_owned(),
79            object.object_name().to_owned(),
80            external_id.to_owned(),
81        )?,
82        SourceSystem::Salesforce,
83        change_operation(&payload),
84        chrono::Utc::now(),
85        payload,
86    )
87    .with_cursor(SourceCursor::SalesforceReplayId(replay_position)))
88}
89
90async fn capture_event_in_tx<C>(
91    client: &C,
92    stream_name: &str,
93    tenant: &str,
94    object: &ObjectSync,
95    payload: Value,
96    replay_id: &ReplayId,
97) -> Result<bool, ForceSyncError>
98where
99    C: GenericClient + Sync + ?Sized,
100{
101    let replay_position = replay_id_to_position(replay_id)?;
102    let envelope = build_envelope(tenant, object, payload, replay_position)?;
103    let cursor = envelope
104        .cursor()
105        .ok_or(ForceSyncError::MissingSourceCursor)?
106        .as_db_value();
107
108    let inserted = match PgStore::append_journal_if_new_in_tx(client, &envelope).await? {
109        AppendResult::Inserted { journal_id } => {
110            PgStore::enqueue_apply_task_in_tx(client, journal_id, 0).await?;
111            true
112        }
113        AppendResult::Duplicate => false,
114    };
115
116    PgStore::advance_checkpoint_if_greater_in_tx(client, stream_name, replay_position, &cursor)
117        .await?;
118    Ok(inserted)
119}
120
121/// Captures a decoded CDC stream into the sync journal and checkpoints replay positions.
122///
123/// # Errors
124///
125/// Returns an error if Pub/Sub decoding, journal writes, or checkpoint updates fail.
126pub async fn capture_stream<S>(
127    store: &PgStore,
128    stream_name: &str,
129    tenant: &str,
130    object: &ObjectSync,
131    mut stream: S,
132) -> Result<usize, ForceSyncError>
133where
134    S: Stream<Item = Result<PubSubEvent<Value>, force_pubsub::PubSubError>> + Unpin,
135{
136    let mut captured = 0usize;
137
138    while let Some(item) = stream.next().await {
139        match item? {
140            PubSubEvent::Event(message) => {
141                let stream_name = stream_name.to_owned();
142                let tenant = tenant.to_owned();
143                let object = object.clone();
144                if store
145                    .with_transaction(|tx| {
146                        Box::pin(async move {
147                            capture_event_in_tx(
148                                tx,
149                                &stream_name,
150                                &tenant,
151                                &object,
152                                message.payload,
153                                &message.replay_id,
154                            )
155                            .await
156                        })
157                    })
158                    .await?
159                {
160                    captured += 1;
161                }
162            }
163            PubSubEvent::KeepAlive | PubSubEvent::Reconnected { .. } => {}
164        }
165    }
166
167    Ok(captured)
168}
169
170/// Loads the stored replay cursor for a CDC stream.
171///
172/// # Errors
173///
174/// Returns an error if the checkpoint cannot be loaded or decoded.
175pub async fn load_replay_id(
176    store: &PgStore,
177    stream_name: &str,
178) -> Result<Option<ReplayId>, ForceSyncError> {
179    let checkpoint = store.get_checkpoint(stream_name).await?;
180    checkpoint
181        .map(|checkpoint| replay_id_from_position(checkpoint.cursor_position))
182        .transpose()
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use serde_json::json;
189
190    // ── change_operation ──────────────────────────────────────────────
191
192    #[test]
193    fn change_operation_returns_delete_for_delete_change_type() {
194        let payload = json!({
195            "ChangeEventHeader": {
196                "changeType": "DELETE"
197            }
198        });
199        assert_eq!(change_operation(&payload), ChangeOperation::Delete);
200    }
201
202    #[test]
203    fn change_operation_returns_upsert_for_create_change_type() {
204        let payload = json!({
205            "ChangeEventHeader": {
206                "changeType": "CREATE"
207            }
208        });
209        assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
210    }
211
212    #[test]
213    fn change_operation_returns_upsert_for_update_change_type() {
214        let payload = json!({
215            "ChangeEventHeader": {
216                "changeType": "UPDATE"
217            }
218        });
219        assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
220    }
221
222    #[test]
223    fn change_operation_returns_upsert_when_header_missing() {
224        let payload = json!({"Name": "Acme"});
225        assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
226    }
227
228    #[test]
229    fn change_operation_returns_upsert_when_change_type_missing() {
230        let payload = json!({
231            "ChangeEventHeader": {
232                "entityName": "Account"
233            }
234        });
235        assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
236    }
237
238    // ── replay_id_to_position ─────────────────────────────────────────
239
240    #[test]
241    fn replay_id_to_position_single_byte() {
242        let replay = ReplayId::from_bytes(vec![42]);
243        let Ok(pos) = replay_id_to_position(&replay) else {
244            panic!("expected Ok for single-byte replay id");
245        };
246        assert_eq!(pos, 42);
247    }
248
249    #[test]
250    fn replay_id_to_position_multi_byte() {
251        // 256 = 0x01 0x00
252        let replay = ReplayId::from_bytes(vec![1, 0]);
253        let Ok(pos) = replay_id_to_position(&replay) else {
254            panic!("expected Ok for multi-byte replay id");
255        };
256        assert_eq!(pos, 256);
257    }
258
259    #[test]
260    fn replay_id_to_position_full_8_bytes() {
261        let replay = ReplayId::from_bytes(vec![0, 0, 0, 0, 0, 0, 1, 0]);
262        let Ok(pos) = replay_id_to_position(&replay) else {
263            panic!("expected Ok for 8-byte replay id");
264        };
265        assert_eq!(pos, 256);
266    }
267
268    #[test]
269    fn replay_id_to_position_rejects_more_than_8_bytes() {
270        let replay = ReplayId::from_bytes(vec![0; 9]);
271        assert!(matches!(
272            replay_id_to_position(&replay),
273            Err(ForceSyncError::InvalidStoredValue { .. })
274        ));
275    }
276
277    #[test]
278    fn replay_id_to_position_rejects_value_exceeding_i64_max() {
279        // 0xFF repeated 8 times = u64::MAX which exceeds i64::MAX
280        let replay = ReplayId::from_bytes(vec![0xFF; 8]);
281        assert!(matches!(
282            replay_id_to_position(&replay),
283            Err(ForceSyncError::InvalidStoredValue { .. })
284        ));
285    }
286
287    // ── replay_id_from_position ───────────────────────────────────────
288
289    #[test]
290    fn replay_id_from_position_small_value() {
291        let Ok(replay) = replay_id_from_position(42) else {
292            panic!("expected Ok for small position");
293        };
294        assert_eq!(replay.as_bytes(), &[42]);
295    }
296
297    #[test]
298    fn replay_id_from_position_multi_byte_value() {
299        let Ok(replay) = replay_id_from_position(256) else {
300            panic!("expected Ok for multi-byte position");
301        };
302        assert_eq!(replay.as_bytes(), &[1, 0]);
303    }
304
305    #[test]
306    fn replay_id_from_position_zero() {
307        let Ok(replay) = replay_id_from_position(0) else {
308            panic!("expected Ok for zero position");
309        };
310        assert_eq!(replay.as_bytes(), &[0]);
311    }
312
313    #[test]
314    fn replay_id_from_position_rejects_negative() {
315        assert!(matches!(
316            replay_id_from_position(-1),
317            Err(ForceSyncError::InvalidStoredValue { .. })
318        ));
319    }
320
321    // ── round-trip ────────────────────────────────────────────────────
322
323    #[test]
324    fn replay_id_round_trip_preserves_value() {
325        let original_position: i64 = 123_456;
326        let Ok(replay) = replay_id_from_position(original_position) else {
327            panic!("expected Ok for from_position");
328        };
329        let Ok(recovered) = replay_id_to_position(&replay) else {
330            panic!("expected Ok for to_position");
331        };
332        assert_eq!(recovered, original_position);
333    }
334}