1use force_pubsub::{PubSubEvent, ReplayId};
4use futures::{Stream, StreamExt};
5use serde_json::Value;
6use tokio_postgres::GenericClient;
7
8use crate::{
9 config::ObjectSync,
10 error::ForceSyncError,
11 identity::SyncKey,
12 model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
13 store::pg::{AppendResult, PgStore},
14};
15
16fn change_operation(payload: &Value) -> ChangeOperation {
17 match payload
18 .get("ChangeEventHeader")
19 .and_then(|header| header.get("changeType"))
20 .and_then(Value::as_str)
21 {
22 Some("DELETE") => ChangeOperation::Delete,
23 _ => ChangeOperation::Upsert,
24 }
25}
26
27fn replay_id_to_position(replay_id: &ReplayId) -> Result<i64, ForceSyncError> {
28 if replay_id.as_bytes().len() > 8 {
29 return Err(ForceSyncError::InvalidStoredValue {
30 field: "replay_id",
31 value: format!("{} bytes", replay_id.as_bytes().len()),
32 });
33 }
34
35 let mut padded = [0u8; 8];
36 let start = 8 - replay_id.as_bytes().len();
37 padded[start..].copy_from_slice(replay_id.as_bytes());
38
39 let unsigned = u64::from_be_bytes(padded);
40 i64::try_from(unsigned).map_err(|_| ForceSyncError::InvalidStoredValue {
41 field: "replay_id",
42 value: unsigned.to_string(),
43 })
44}
45
46fn replay_id_from_position(position: i64) -> Result<ReplayId, ForceSyncError> {
47 let unsigned = u64::try_from(position).map_err(|_| ForceSyncError::InvalidStoredValue {
48 field: "cursor_position",
49 value: position.to_string(),
50 })?;
51 let bytes = unsigned.to_be_bytes();
52 let first_non_zero = bytes.iter().position(|byte| *byte != 0).unwrap_or(7);
53 Ok(ReplayId::from_bytes(bytes[first_non_zero..].to_vec()))
54}
55
56fn build_envelope(
57 tenant: &str,
58 object: &ObjectSync,
59 payload: Value,
60 replay_position: i64,
61) -> Result<ChangeEnvelope, ForceSyncError> {
62 let external_id_field =
63 object
64 .external_id_field()
65 .ok_or(ForceSyncError::MissingConfiguration {
66 field: "external_id_field",
67 })?;
68 let external_id = payload
69 .get(external_id_field)
70 .and_then(Value::as_str)
71 .ok_or_else(|| ForceSyncError::InvalidStoredValue {
72 field: "external_id",
73 value: payload.to_string(),
74 })?;
75
76 Ok(ChangeEnvelope::new(
77 SyncKey::new(
78 tenant.to_owned(),
79 object.object_name().to_owned(),
80 external_id.to_owned(),
81 )?,
82 SourceSystem::Salesforce,
83 change_operation(&payload),
84 chrono::Utc::now(),
85 payload,
86 )
87 .with_cursor(SourceCursor::SalesforceReplayId(replay_position)))
88}
89
90async fn capture_event_in_tx<C>(
91 client: &C,
92 stream_name: &str,
93 tenant: &str,
94 object: &ObjectSync,
95 payload: Value,
96 replay_id: &ReplayId,
97) -> Result<bool, ForceSyncError>
98where
99 C: GenericClient + Sync + ?Sized,
100{
101 let replay_position = replay_id_to_position(replay_id)?;
102 let envelope = build_envelope(tenant, object, payload, replay_position)?;
103 let cursor = envelope
104 .cursor()
105 .ok_or(ForceSyncError::MissingSourceCursor)?
106 .as_db_value();
107
108 let inserted = match PgStore::append_journal_if_new_in_tx(client, &envelope).await? {
109 AppendResult::Inserted { journal_id } => {
110 PgStore::enqueue_apply_task_in_tx(client, journal_id, 0).await?;
111 true
112 }
113 AppendResult::Duplicate => false,
114 };
115
116 PgStore::advance_checkpoint_if_greater_in_tx(client, stream_name, replay_position, &cursor)
117 .await?;
118 Ok(inserted)
119}
120
121pub async fn capture_stream<S>(
127 store: &PgStore,
128 stream_name: &str,
129 tenant: &str,
130 object: &ObjectSync,
131 mut stream: S,
132) -> Result<usize, ForceSyncError>
133where
134 S: Stream<Item = Result<PubSubEvent<Value>, force_pubsub::PubSubError>> + Unpin,
135{
136 let mut captured = 0usize;
137
138 while let Some(item) = stream.next().await {
139 match item? {
140 PubSubEvent::Event(message) => {
141 let stream_name = stream_name.to_owned();
142 let tenant = tenant.to_owned();
143 let object = object.clone();
144 if store
145 .with_transaction(|tx| {
146 Box::pin(async move {
147 capture_event_in_tx(
148 tx,
149 &stream_name,
150 &tenant,
151 &object,
152 message.payload,
153 &message.replay_id,
154 )
155 .await
156 })
157 })
158 .await?
159 {
160 captured += 1;
161 }
162 }
163 PubSubEvent::KeepAlive | PubSubEvent::Reconnected { .. } => {}
164 }
165 }
166
167 Ok(captured)
168}
169
170pub async fn load_replay_id(
176 store: &PgStore,
177 stream_name: &str,
178) -> Result<Option<ReplayId>, ForceSyncError> {
179 let checkpoint = store.get_checkpoint(stream_name).await?;
180 checkpoint
181 .map(|checkpoint| replay_id_from_position(checkpoint.cursor_position))
182 .transpose()
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use serde_json::json;
189
190 #[test]
193 fn change_operation_returns_delete_for_delete_change_type() {
194 let payload = json!({
195 "ChangeEventHeader": {
196 "changeType": "DELETE"
197 }
198 });
199 assert_eq!(change_operation(&payload), ChangeOperation::Delete);
200 }
201
202 #[test]
203 fn change_operation_returns_upsert_for_create_change_type() {
204 let payload = json!({
205 "ChangeEventHeader": {
206 "changeType": "CREATE"
207 }
208 });
209 assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
210 }
211
212 #[test]
213 fn change_operation_returns_upsert_for_update_change_type() {
214 let payload = json!({
215 "ChangeEventHeader": {
216 "changeType": "UPDATE"
217 }
218 });
219 assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
220 }
221
222 #[test]
223 fn change_operation_returns_upsert_when_header_missing() {
224 let payload = json!({"Name": "Acme"});
225 assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
226 }
227
228 #[test]
229 fn change_operation_returns_upsert_when_change_type_missing() {
230 let payload = json!({
231 "ChangeEventHeader": {
232 "entityName": "Account"
233 }
234 });
235 assert_eq!(change_operation(&payload), ChangeOperation::Upsert);
236 }
237
238 #[test]
241 fn replay_id_to_position_single_byte() {
242 let replay = ReplayId::from_bytes(vec![42]);
243 let Ok(pos) = replay_id_to_position(&replay) else {
244 panic!("expected Ok for single-byte replay id");
245 };
246 assert_eq!(pos, 42);
247 }
248
249 #[test]
250 fn replay_id_to_position_multi_byte() {
251 let replay = ReplayId::from_bytes(vec![1, 0]);
253 let Ok(pos) = replay_id_to_position(&replay) else {
254 panic!("expected Ok for multi-byte replay id");
255 };
256 assert_eq!(pos, 256);
257 }
258
259 #[test]
260 fn replay_id_to_position_full_8_bytes() {
261 let replay = ReplayId::from_bytes(vec![0, 0, 0, 0, 0, 0, 1, 0]);
262 let Ok(pos) = replay_id_to_position(&replay) else {
263 panic!("expected Ok for 8-byte replay id");
264 };
265 assert_eq!(pos, 256);
266 }
267
268 #[test]
269 fn replay_id_to_position_rejects_more_than_8_bytes() {
270 let replay = ReplayId::from_bytes(vec![0; 9]);
271 assert!(matches!(
272 replay_id_to_position(&replay),
273 Err(ForceSyncError::InvalidStoredValue { .. })
274 ));
275 }
276
277 #[test]
278 fn replay_id_to_position_rejects_value_exceeding_i64_max() {
279 let replay = ReplayId::from_bytes(vec![0xFF; 8]);
281 assert!(matches!(
282 replay_id_to_position(&replay),
283 Err(ForceSyncError::InvalidStoredValue { .. })
284 ));
285 }
286
287 #[test]
290 fn replay_id_from_position_small_value() {
291 let Ok(replay) = replay_id_from_position(42) else {
292 panic!("expected Ok for small position");
293 };
294 assert_eq!(replay.as_bytes(), &[42]);
295 }
296
297 #[test]
298 fn replay_id_from_position_multi_byte_value() {
299 let Ok(replay) = replay_id_from_position(256) else {
300 panic!("expected Ok for multi-byte position");
301 };
302 assert_eq!(replay.as_bytes(), &[1, 0]);
303 }
304
305 #[test]
306 fn replay_id_from_position_zero() {
307 let Ok(replay) = replay_id_from_position(0) else {
308 panic!("expected Ok for zero position");
309 };
310 assert_eq!(replay.as_bytes(), &[0]);
311 }
312
313 #[test]
314 fn replay_id_from_position_rejects_negative() {
315 assert!(matches!(
316 replay_id_from_position(-1),
317 Err(ForceSyncError::InvalidStoredValue { .. })
318 ));
319 }
320
321 #[test]
324 fn replay_id_round_trip_preserves_value() {
325 let original_position: i64 = 123_456;
326 let Ok(replay) = replay_id_from_position(original_position) else {
327 panic!("expected Ok for from_position");
328 };
329 let Ok(recovered) = replay_id_to_position(&replay) else {
330 panic!("expected Ok for to_position");
331 };
332 assert_eq!(recovered, original_position);
333 }
334}