Skip to main content

force_sync/
runtime.rs

1//! Narrow runtime orchestration for the v0.1 vertical slice.
2
3use std::{collections::HashMap, time::Duration};
4
5use chrono::{DateTime, Utc};
6use force::{auth::Authenticator, client::ForceClient};
7use serde_json::Value;
8
9use crate::{
10    ApplyLane, ObjectSync, PlannerContext,
11    apply::{
12        postgres::project_sync_link,
13        salesforce::{ApplyError, SalesforceApplier},
14    },
15    capture,
16    error::ForceSyncError,
17    identity::SyncKey,
18    model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
19    plan_change, reconcile,
20    store::pg::{LeasedTask, PgStore, SyncConflict},
21};
22
23struct ApplyTaskContext {
24    _journal_id: i64,
25    envelope: ChangeEnvelope,
26    current_payload: Option<Value>,
27}
28
29/// Runtime sync engine for the explicit Postgres-to-Salesforce vertical slice.
30#[derive(Debug, Clone)]
31pub struct SyncEngine<A: Authenticator> {
32    store: PgStore,
33    salesforce: SalesforceApplier<A>,
34    objects: HashMap<String, ObjectSync>,
35    capture_batch_size: i64,
36    capture_priority: i32,
37    apply_batch_size: i64,
38    reconcile_batch_size: i64,
39    lease_for: Duration,
40    worker_id: String,
41}
42
43/// Builder for [`SyncEngine`].
44#[derive(Debug)]
45pub struct SyncEngineBuilder<A: Authenticator> {
46    salesforce_client: ForceClient<A>,
47    postgres: Option<PgStore>,
48    objects: Vec<ObjectSync>,
49    capture_batch_size: i64,
50    capture_priority: i32,
51    apply_batch_size: i64,
52    reconcile_batch_size: i64,
53    lease_for: Duration,
54    worker_id: String,
55}
56
57impl<A: Authenticator> SyncEngine<A> {
58    /// Creates a builder for a runtime using the given Salesforce client.
59    #[must_use]
60    pub fn builder(salesforce_client: ForceClient<A>) -> SyncEngineBuilder<A> {
61        SyncEngineBuilder::new(salesforce_client)
62    }
63
64    /// Captures a single configured batch from the Postgres outbox.
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if the outbox capture fails.
69    pub async fn run_capture_postgres_once(&self) -> Result<usize, ForceSyncError> {
70        capture::postgres::capture_batch(
71            &self.store,
72            self.capture_batch_size,
73            self.capture_priority,
74        )
75        .await
76    }
77
78    /// Leases and applies one configured batch of ready tasks.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if task loading or control-plane updates fail.
83    pub async fn run_apply_once(&self) -> Result<usize, ForceSyncError> {
84        let leased = self
85            .store
86            .lease_ready_tasks(&self.worker_id, self.apply_batch_size, self.lease_for)
87            .await?;
88        let batch_size = leased.len().max(1);
89        let mut applied = 0usize;
90
91        for task in leased {
92            if self.process_leased_task(&task, batch_size).await? {
93                applied += 1;
94            }
95        }
96
97        Ok(applied)
98    }
99
100    /// Runs one reconciliation pass and enqueues repair tasks for drift.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if drift detection or task insertion fails.
105    pub async fn run_reconcile_once(&self) -> Result<usize, ForceSyncError> {
106        reconcile::run_reconcile_once(&self.store, self.reconcile_batch_size.max(1)).await
107    }
108
109    async fn process_leased_task(
110        &self,
111        task: &LeasedTask,
112        batch_size: usize,
113    ) -> Result<bool, ForceSyncError> {
114        let context = self.load_apply_task_context(task.task_id).await?;
115        let Some(context) = context else {
116            let _ = self
117                .store
118                .fail_task_for_worker(
119                    &self.worker_id,
120                    task.task_id,
121                    "missing apply task journal row",
122                )
123                .await?;
124            return Ok(false);
125        };
126
127        let object = self
128            .objects
129            .get(context.envelope.sync_key().object_name())
130            .ok_or(ForceSyncError::MissingConfiguration {
131                field: "object sync",
132            })?;
133
134        let existing_link = self
135            .store
136            .get_link(
137                context.envelope.sync_key().tenant(),
138                context.envelope.sync_key().object_name(),
139                context.envelope.sync_key().external_id(),
140            )
141            .await?;
142
143        let decision = plan_change(
144            &PlannerContext {
145                object: object.clone(),
146                current_payload: context.current_payload.clone(),
147                batch_size,
148                urgent: batch_size <= object.lane_thresholds().rest_max_batch_size(),
149                has_dependencies: false,
150            },
151            &context.envelope,
152        );
153
154        self.apply_planned_task(
155            task,
156            &context.envelope,
157            existing_link.as_ref(),
158            object,
159            decision,
160        )
161        .await
162    }
163
164    async fn apply_planned_task(
165        &self,
166        task: &LeasedTask,
167        envelope: &ChangeEnvelope,
168        existing_link: Option<&crate::store::pg::SyncLink>,
169        object: &ObjectSync,
170        decision: crate::plan::PlanDecision,
171    ) -> Result<bool, ForceSyncError> {
172        if decision.lane == ApplyLane::Conflict {
173            for field_name in &decision.conflicts {
174                let conflict = SyncConflict {
175                    tenant: envelope.sync_key().tenant().to_owned(),
176                    object_name: envelope.sync_key().object_name().to_owned(),
177                    external_id: envelope.sync_key().external_id().to_owned(),
178                    field_name: field_name.clone(),
179                    left_value: Value::Null,
180                    right_value: Value::Null,
181                    resolution: None,
182                };
183                self.store.insert_conflict(&conflict).await?;
184            }
185            let _ = self
186                .store
187                .fail_task_for_worker(
188                    &self.worker_id,
189                    task.task_id,
190                    format!("planner conflict: {}", decision.conflicts.join(",")),
191                )
192                .await?;
193            return Ok(false);
194        }
195
196        if decision.lane == ApplyLane::Noop {
197            self.store
198                .ack_task_for_worker(&self.worker_id, task.task_id)
199                .await?;
200            return Ok(false);
201        }
202
203        if should_project_locally(envelope.source(), decision.lane) {
204            return self.apply_local_task(task, envelope, existing_link).await;
205        }
206
207        match decision.lane {
208            ApplyLane::Rest => {
209                let payload = decision
210                    .payload
211                    .as_ref()
212                    .unwrap_or_else(|| envelope.payload());
213                let success = self
214                    .apply_rest_task(task, envelope, payload, existing_link, object)
215                    .await?;
216                Ok(success)
217            }
218            ApplyLane::Bulk => {
219                let payload = decision
220                    .payload
221                    .as_ref()
222                    .unwrap_or_else(|| envelope.payload());
223                let success = self
224                    .apply_bulk_task(task, envelope, payload, existing_link, object)
225                    .await?;
226                Ok(success)
227            }
228            ApplyLane::CompositeGraph => {
229                let _ = self
230                    .store
231                    .fail_task_for_worker(
232                        &self.worker_id,
233                        task.task_id,
234                        format!("unsupported runtime lane: {:?}", decision.lane),
235                    )
236                    .await?;
237                Ok(false)
238            }
239            ApplyLane::Conflict | ApplyLane::Noop => unreachable!("handled above"),
240        }
241    }
242
243    async fn apply_local_task(
244        &self,
245        task: &LeasedTask,
246        envelope: &ChangeEnvelope,
247        existing_link: Option<&crate::store::pg::SyncLink>,
248    ) -> Result<bool, ForceSyncError> {
249        let salesforce_id = local_projection_salesforce_id(existing_link, envelope)?;
250        let link = project_sync_link(
251            existing_link,
252            envelope,
253            salesforce_id.as_ref(),
254            matches!(envelope.operation(), ChangeOperation::Delete),
255        );
256        self.store.put_link(&link).await?;
257        self.store
258            .ack_task_for_worker(&self.worker_id, task.task_id)
259            .await?;
260        Ok(true)
261    }
262
263    async fn apply_bulk_task(
264        &self,
265        task: &LeasedTask,
266        envelope: &ChangeEnvelope,
267        payload: &Value,
268        existing_link: Option<&crate::store::pg::SyncLink>,
269        object: &ObjectSync,
270    ) -> Result<bool, ForceSyncError> {
271        match envelope.operation() {
272            ChangeOperation::Upsert => {
273                let Some(existing_salesforce_id) =
274                    existing_link.and_then(|link| link.salesforce_id.as_deref())
275                else {
276                    return self
277                        .apply_rest_task(task, envelope, payload, existing_link, object)
278                        .await;
279                };
280
281                let job_result = self
282                    .salesforce
283                    .apply_bulk_upsert(
284                        envelope.sync_key().object_name(),
285                        object
286                            .external_id_field()
287                            .ok_or(ForceSyncError::MissingConfiguration {
288                                field: "external_id_field",
289                            })?,
290                        1,
291                        vec![payload.clone()],
292                    )
293                    .await;
294                if let Err(error) = job_result {
295                    return self.handle_apply_error(task, error).await;
296                }
297
298                let Some(salesforce_id) =
299                    force::types::SalesforceId::new(existing_salesforce_id.to_owned()).ok()
300                else {
301                    return self
302                        .apply_rest_task(task, envelope, payload, existing_link, object)
303                        .await;
304                };
305
306                let link = project_sync_link(existing_link, envelope, Some(&salesforce_id), false);
307                self.store.put_link(&link).await?;
308                self.store
309                    .ack_task_for_worker(&self.worker_id, task.task_id)
310                    .await?;
311                Ok(true)
312            }
313            ChangeOperation::Delete => {
314                self.apply_rest_task(task, envelope, payload, existing_link, object)
315                    .await
316            }
317        }
318    }
319
320    async fn apply_rest_task(
321        &self,
322        task: &LeasedTask,
323        envelope: &ChangeEnvelope,
324        payload: &Value,
325        existing_link: Option<&crate::store::pg::SyncLink>,
326        object: &ObjectSync,
327    ) -> Result<bool, ForceSyncError> {
328        match envelope.operation() {
329            ChangeOperation::Upsert => {
330                let result = self
331                    .salesforce
332                    .apply_rest_upsert(
333                        envelope.sync_key().object_name(),
334                        object
335                            .external_id_field()
336                            .ok_or(ForceSyncError::MissingConfiguration {
337                                field: "external_id_field",
338                            })?,
339                        envelope.sync_key().external_id(),
340                        payload,
341                    )
342                    .await;
343
344                match result {
345                    Ok(result) => {
346                        let link = project_sync_link(
347                            existing_link,
348                            envelope,
349                            result.salesforce_id.as_ref(),
350                            false,
351                        );
352                        self.store.put_link(&link).await?;
353                        self.store
354                            .ack_task_for_worker(&self.worker_id, task.task_id)
355                            .await?;
356                        Ok(true)
357                    }
358                    Err(error) => self.handle_apply_error(task, error).await,
359                }
360            }
361            ChangeOperation::Delete => {
362                let Some(existing_link) = existing_link else {
363                    let _ = self
364                        .store
365                        .fail_task_for_worker(
366                            &self.worker_id,
367                            task.task_id,
368                            "missing Salesforce ID for delete",
369                        )
370                        .await?;
371                    return Ok(false);
372                };
373                let Some(salesforce_id) = existing_link.salesforce_id.as_deref() else {
374                    let _ = self
375                        .store
376                        .fail_task_for_worker(
377                            &self.worker_id,
378                            task.task_id,
379                            "missing Salesforce ID for delete",
380                        )
381                        .await?;
382                    return Ok(false);
383                };
384                let salesforce_id = force::types::SalesforceId::new(salesforce_id.to_owned())
385                    .map_err(|error| ForceSyncError::InvalidStoredValue {
386                        field: "salesforce_id",
387                        value: error.to_string(),
388                    })?;
389
390                match self
391                    .salesforce
392                    .apply_rest_delete(envelope.sync_key().object_name(), &salesforce_id)
393                    .await
394                {
395                    Ok(()) => {
396                        let link = project_sync_link(
397                            existing_link.into(),
398                            envelope,
399                            Some(&salesforce_id),
400                            true,
401                        );
402                        self.store.put_link(&link).await?;
403                        self.store
404                            .ack_task_for_worker(&self.worker_id, task.task_id)
405                            .await?;
406                        Ok(true)
407                    }
408                    Err(error) => self.handle_apply_error(task, error).await,
409                }
410            }
411        }
412    }
413
414    async fn handle_apply_error(
415        &self,
416        task: &LeasedTask,
417        error: ApplyError,
418    ) -> Result<bool, ForceSyncError> {
419        let error_message = error.to_string();
420        match error {
421            ApplyError::Retryable(_) => {
422                self.store
423                    .retry_task_for_worker(
424                        &self.worker_id,
425                        task.task_id,
426                        Utc::now() + chrono::Duration::seconds(30),
427                        error_message,
428                    )
429                    .await?;
430            }
431            ApplyError::Permanent(_) => {
432                self.store
433                    .fail_task_for_worker(&self.worker_id, task.task_id, error_message)
434                    .await?;
435            }
436        }
437
438        Ok(false)
439    }
440
441    async fn load_apply_task_context(
442        &self,
443        task_id: i64,
444    ) -> Result<Option<ApplyTaskContext>, ForceSyncError> {
445        let client = self.store.pool().get().await?;
446        let row = client
447            .query_opt(
448                "select
449                    j.journal_id,
450                    j.tenant,
451                    j.object_name,
452                    j.external_id,
453                    j.source,
454                    j.source_cursor,
455                    j.observed_at,
456                    j.operation,
457                    j.payload::text as payload_json,
458                    applied.payload::text as current_payload_json
459                 from sync_task t
460                 join sync_journal j
461                   on (t.payload->>'journal_id')::bigint = j.journal_id
462                 left join sync_link link
463                   on link.tenant = j.tenant
464                  and link.object_name = j.object_name
465                  and link.external_id = j.external_id
466                 left join lateral (
467                    select payload
468                    from sync_journal applied
469                    where applied.tenant = j.tenant
470                      and applied.object_name = j.object_name
471                      and applied.external_id = j.external_id
472                      and link.last_payload_hash is not null
473                      and applied.payload_hash = link.last_payload_hash
474                    order by applied.journal_id desc
475                    limit 1
476                 ) applied on true
477                 where t.task_id = $1",
478                &[&task_id],
479            )
480            .await?;
481
482        row.as_ref().map(build_apply_task_context).transpose()
483    }
484}
485
486impl<A: Authenticator> SyncEngineBuilder<A> {
487    fn new(salesforce_client: ForceClient<A>) -> Self {
488        Self {
489            salesforce_client,
490            postgres: None,
491            objects: Vec::new(),
492            capture_batch_size: 100,
493            capture_priority: 0,
494            apply_batch_size: 1,
495            reconcile_batch_size: 100,
496            lease_for: Duration::from_secs(30),
497            worker_id: "force-sync-apply".to_owned(),
498        }
499    }
500
501    /// Sets the Postgres control-plane store.
502    #[must_use]
503    pub fn postgres(mut self, postgres: PgStore) -> Self {
504        self.postgres = Some(postgres);
505        self
506    }
507
508    /// Adds an object sync definition.
509    #[must_use]
510    pub fn object(mut self, object: ObjectSync) -> Self {
511        self.objects.push(object);
512        self
513    }
514
515    /// Sets the number of records scanned per reconciliation pass.
516    #[must_use]
517    pub const fn reconcile_batch_size(mut self, reconcile_batch_size: i64) -> Self {
518        self.reconcile_batch_size = reconcile_batch_size;
519        self
520    }
521
522    /// Sets the number of tasks leased for each apply pass.
523    #[must_use]
524    pub const fn apply_batch_size(mut self, apply_batch_size: i64) -> Self {
525        self.apply_batch_size = apply_batch_size;
526        self
527    }
528
529    /// Builds the runtime engine.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if required runtime configuration is missing.
534    pub fn build(self) -> Result<SyncEngine<A>, ForceSyncError> {
535        let store = self
536            .postgres
537            .ok_or(ForceSyncError::MissingConfiguration { field: "postgres" })?;
538        if self.objects.is_empty() {
539            return Err(ForceSyncError::MissingConfiguration {
540                field: "object sync",
541            });
542        }
543
544        Ok(SyncEngine {
545            store,
546            salesforce: SalesforceApplier::new(self.salesforce_client),
547            objects: self
548                .objects
549                .into_iter()
550                .map(|object| (object.object_name().to_owned(), object))
551                .collect(),
552            capture_batch_size: self.capture_batch_size,
553            capture_priority: self.capture_priority,
554            apply_batch_size: self.apply_batch_size,
555            reconcile_batch_size: self.reconcile_batch_size,
556            lease_for: self.lease_for,
557            worker_id: self.worker_id,
558        })
559    }
560}
561
562fn build_apply_task_context(row: &tokio_postgres::Row) -> Result<ApplyTaskContext, ForceSyncError> {
563    let journal_id: i64 = row.get("journal_id");
564    let tenant: String = row.get("tenant");
565    let object_name: String = row.get("object_name");
566    let external_id: String = row.get("external_id");
567    let source: String = row.get("source");
568    let source_cursor: String = row.get("source_cursor");
569    let observed_at: DateTime<Utc> = row.get("observed_at");
570    let operation: String = row.get("operation");
571    let payload_json: String = row.get("payload_json");
572    let current_payload_json: Option<String> = row.get("current_payload_json");
573
574    let sync_key = SyncKey::new(tenant, object_name, external_id)?;
575    let payload = serde_json::from_str(&payload_json)?;
576    let current_payload = match current_payload_json {
577        Some(current_payload_json) => Some(serde_json::from_str(&current_payload_json)?),
578        None => None,
579    };
580    let envelope = ChangeEnvelope::new(
581        sync_key,
582        parse_source_system(&source)?,
583        parse_change_operation(&operation)?,
584        observed_at,
585        payload,
586    )
587    .with_cursor(parse_source_cursor(&source_cursor)?);
588
589    Ok(ApplyTaskContext {
590        _journal_id: journal_id,
591        envelope,
592        current_payload,
593    })
594}
595
596fn should_project_locally(source: SourceSystem, lane: ApplyLane) -> bool {
597    source == SourceSystem::Salesforce && !matches!(lane, ApplyLane::Noop | ApplyLane::Conflict)
598}
599
600fn local_projection_salesforce_id(
601    existing_link: Option<&crate::store::pg::SyncLink>,
602    envelope: &ChangeEnvelope,
603) -> Result<Option<force::types::SalesforceId>, ForceSyncError> {
604    if let Some(existing_salesforce_id) =
605        existing_link.and_then(|link| link.salesforce_id.as_deref())
606    {
607        return force::types::SalesforceId::new(existing_salesforce_id.to_owned())
608            .map(Some)
609            .map_err(|error| ForceSyncError::InvalidStoredValue {
610                field: "salesforce_id",
611                value: error.to_string(),
612            });
613    }
614
615    let Some(payload_salesforce_id) = envelope.payload().get("Id").and_then(Value::as_str) else {
616        return Ok(None);
617    };
618
619    force::types::SalesforceId::new(payload_salesforce_id.to_owned())
620        .map(Some)
621        .map_err(|error| ForceSyncError::InvalidStoredValue {
622            field: "payload.Id",
623            value: error.to_string(),
624        })
625}
626
627fn parse_source_system(value: &str) -> Result<SourceSystem, ForceSyncError> {
628    match value {
629        "salesforce" => Ok(SourceSystem::Salesforce),
630        "postgres" => Ok(SourceSystem::Postgres),
631        _ => Err(ForceSyncError::InvalidStoredValue {
632            field: "source",
633            value: value.to_owned(),
634        }),
635    }
636}
637
638fn parse_change_operation(value: &str) -> Result<ChangeOperation, ForceSyncError> {
639    match value {
640        "upsert" => Ok(ChangeOperation::Upsert),
641        "delete" => Ok(ChangeOperation::Delete),
642        _ => Err(ForceSyncError::InvalidStoredValue {
643            field: "operation",
644            value: value.to_owned(),
645        }),
646    }
647}
648
649fn parse_source_cursor(value: &str) -> Result<SourceCursor, ForceSyncError> {
650    if let Some(replay_id) = value.strip_prefix("salesforce-replay-id:") {
651        let replay_id =
652            replay_id
653                .parse::<i64>()
654                .map_err(|_| ForceSyncError::InvalidStoredValue {
655                    field: "source_cursor",
656                    value: value.to_owned(),
657                })?;
658        return Ok(SourceCursor::SalesforceReplayId(replay_id));
659    }
660
661    if let Some(lsn) = value.strip_prefix("postgres-lsn:") {
662        return Ok(SourceCursor::PostgresLsn(lsn.to_owned()));
663    }
664
665    if let Some(snapshot) = value.strip_prefix("snapshot:") {
666        return Ok(SourceCursor::Snapshot(snapshot.to_owned()));
667    }
668
669    Err(ForceSyncError::InvalidStoredValue {
670        field: "source_cursor",
671        value: value.to_owned(),
672    })
673}
674
675#[cfg(test)]
676mod tests {
677    use chrono::Utc;
678    use serde_json::json;
679
680    use super::{
681        local_projection_salesforce_id, parse_change_operation, parse_source_cursor,
682        parse_source_system, should_project_locally,
683    };
684    use crate::{
685        identity::SyncKey,
686        model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
687        plan::ApplyLane,
688        store::pg::SyncLink,
689    };
690
691    fn envelope(payload: serde_json::Value) -> ChangeEnvelope {
692        ChangeEnvelope::new(
693            SyncKey::new("tenant", "Account", "external-1")
694                .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
695            SourceSystem::Salesforce,
696            ChangeOperation::Upsert,
697            Utc::now(),
698            payload,
699        )
700    }
701
702    #[test]
703    fn should_not_project_local_noops_back_to_salesforce() {
704        assert!(!should_project_locally(
705            SourceSystem::Salesforce,
706            ApplyLane::Noop
707        ));
708        assert!(!should_project_locally(
709            SourceSystem::Postgres,
710            ApplyLane::Rest
711        ));
712        assert!(should_project_locally(
713            SourceSystem::Salesforce,
714            ApplyLane::Rest
715        ));
716    }
717
718    #[test]
719    fn local_projection_salesforce_id_uses_payload_id_when_link_is_missing() {
720        let envelope = envelope(json!({
721            "Id": "001000000000009AAA",
722            "Name": "Incoming Salesforce"
723        }));
724
725        let salesforce_id = local_projection_salesforce_id(None, &envelope)
726            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
727
728        assert_eq!(
729            salesforce_id
730                .as_ref()
731                .map(force::types::SalesforceId::as_str),
732            Some("001000000000009AAA")
733        );
734    }
735
736    #[test]
737    fn local_projection_salesforce_id_prefers_existing_link() {
738        let envelope = envelope(json!({
739            "Id": "001000000000009AAA",
740            "Name": "Incoming Salesforce"
741        }));
742        let existing_link = SyncLink {
743            tenant: "tenant".to_owned(),
744            object_name: "Account".to_owned(),
745            external_id: "external-1".to_owned(),
746            salesforce_id: Some("001000000000001AAA".to_owned()),
747            postgres_id: None,
748            last_source: Some("postgres".to_owned()),
749            last_source_cursor: Some("postgres-lsn:1".to_owned()),
750            last_payload_hash: None,
751            tombstone: false,
752        };
753
754        let salesforce_id = local_projection_salesforce_id(Some(&existing_link), &envelope)
755            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
756
757        assert_eq!(
758            salesforce_id
759                .as_ref()
760                .map(force::types::SalesforceId::as_str),
761            Some("001000000000001AAA")
762        );
763    }
764
765    #[test]
766    fn local_projection_returns_none_when_no_link_and_no_payload_id() {
767        let envelope = envelope(json!({"Name": "No Id Field"}));
768        let salesforce_id = local_projection_salesforce_id(None, &envelope)
769            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
770        assert!(salesforce_id.is_none());
771    }
772
773    // -- parse_source_system tests --
774
775    #[test]
776    fn parse_source_system_salesforce() {
777        assert_eq!(
778            parse_source_system("salesforce").unwrap_or_else(|e| panic!("unexpected error: {e}")),
779            SourceSystem::Salesforce,
780        );
781    }
782
783    #[test]
784    fn parse_source_system_postgres() {
785        assert_eq!(
786            parse_source_system("postgres").unwrap_or_else(|e| panic!("unexpected error: {e}")),
787            SourceSystem::Postgres,
788        );
789    }
790
791    #[test]
792    fn parse_source_system_unknown_returns_error() {
793        let Err(err) = parse_source_system("oracle") else {
794            panic!("expected error for unknown source system");
795        };
796        assert!(err.to_string().contains("oracle"));
797    }
798
799    // -- parse_change_operation tests --
800
801    #[test]
802    fn parse_change_operation_upsert() {
803        assert_eq!(
804            parse_change_operation("upsert").unwrap_or_else(|e| panic!("unexpected error: {e}")),
805            ChangeOperation::Upsert,
806        );
807    }
808
809    #[test]
810    fn parse_change_operation_delete() {
811        assert_eq!(
812            parse_change_operation("delete").unwrap_or_else(|e| panic!("unexpected error: {e}")),
813            ChangeOperation::Delete,
814        );
815    }
816
817    #[test]
818    fn parse_change_operation_unknown_returns_error() {
819        let Err(err) = parse_change_operation("insert") else {
820            panic!("expected error for unknown change operation");
821        };
822        assert!(err.to_string().contains("insert"));
823    }
824
825    // -- parse_source_cursor tests --
826
827    #[test]
828    fn parse_source_cursor_salesforce_replay_id() {
829        let cursor = parse_source_cursor("salesforce-replay-id:42")
830            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
831        assert_eq!(cursor, SourceCursor::SalesforceReplayId(42));
832    }
833
834    #[test]
835    fn parse_source_cursor_postgres_lsn() {
836        let cursor = parse_source_cursor("postgres-lsn:0/16B3748")
837            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
838        assert_eq!(cursor, SourceCursor::PostgresLsn("0/16B3748".to_owned()));
839    }
840
841    #[test]
842    fn parse_source_cursor_snapshot() {
843        let cursor = parse_source_cursor("snapshot:2024-01-01T00:00:00Z")
844            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
845        assert_eq!(
846            cursor,
847            SourceCursor::Snapshot("2024-01-01T00:00:00Z".to_owned())
848        );
849    }
850
851    #[test]
852    fn parse_source_cursor_invalid_replay_id_returns_error() {
853        let Err(err) = parse_source_cursor("salesforce-replay-id:not-a-number") else {
854            panic!("expected error for invalid replay ID");
855        };
856        assert!(err.to_string().contains("source_cursor"));
857    }
858
859    #[test]
860    fn parse_source_cursor_unknown_prefix_returns_error() {
861        let Err(err) = parse_source_cursor("kafka-offset:99") else {
862            panic!("expected error for unknown cursor prefix");
863        };
864        assert!(err.to_string().contains("kafka-offset:99"));
865    }
866
867    // -- should_project_locally additional coverage --
868
869    #[test]
870    fn should_project_locally_salesforce_bulk() {
871        assert!(should_project_locally(
872            SourceSystem::Salesforce,
873            ApplyLane::Bulk
874        ));
875    }
876
877    #[test]
878    fn should_not_project_locally_salesforce_conflict() {
879        assert!(!should_project_locally(
880            SourceSystem::Salesforce,
881            ApplyLane::Conflict
882        ));
883    }
884
885    #[test]
886    fn should_not_project_locally_postgres_bulk() {
887        assert!(!should_project_locally(
888            SourceSystem::Postgres,
889            ApplyLane::Bulk
890        ));
891    }
892
893    // -- SyncEngineBuilder validation tests --
894
895    mod builder_tests {
896        use async_trait::async_trait;
897        use force::{
898            auth::{AccessToken, Authenticator, TokenResponse},
899            client::builder,
900            error::Result as ForceResult,
901        };
902
903        use crate::{ObjectSync, SyncEngine, error::ForceSyncError};
904
905        #[derive(Debug, Clone)]
906        struct StubAuth;
907
908        #[async_trait]
909        impl Authenticator for StubAuth {
910            async fn authenticate(&self) -> ForceResult<AccessToken> {
911                Ok(AccessToken::from_response(TokenResponse {
912                    access_token: "stub".to_owned(),
913                    instance_url: "https://stub.salesforce.com".to_owned(),
914                    token_type: "Bearer".to_owned(),
915                    issued_at: "1704067200000".to_owned(),
916                    signature: "stub".to_owned(),
917                    expires_in: Some(7200),
918                    refresh_token: None,
919                }))
920            }
921
922            async fn refresh(&self) -> ForceResult<AccessToken> {
923                self.authenticate().await
924            }
925        }
926
927        #[tokio::test]
928        async fn build_fails_without_postgres() {
929            let client = builder()
930                .authenticate(StubAuth)
931                .build()
932                .await
933                .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
934
935            let result = SyncEngine::builder(client)
936                .object(ObjectSync::new("Account").external_id("ExternalId__c"))
937                .build();
938
939            let Err(err) = result else {
940                panic!("expected error for missing postgres");
941            };
942            assert!(
943                matches!(
944                    err,
945                    ForceSyncError::MissingConfiguration { field: "postgres" }
946                ),
947                "expected MissingConfiguration for postgres, got: {err}"
948            );
949        }
950
951        #[tokio::test]
952        async fn build_fails_with_empty_objects() {
953            let client = builder()
954                .authenticate(StubAuth)
955                .build()
956                .await
957                .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
958
959            // Create a dummy PgStore -- it won't be used for queries, only for builder validation.
960            let mut config = deadpool_postgres::Config::new();
961            config.url = Some("postgresql://unused:unused@localhost:5432/unused".to_owned());
962            let pool = config
963                .create_pool(
964                    Some(deadpool_postgres::Runtime::Tokio1),
965                    tokio_postgres::NoTls,
966                )
967                .unwrap_or_else(|e| panic!("unexpected pool error: {e}"));
968            let store = crate::PgStore::new(pool);
969
970            let result = SyncEngine::builder(client).postgres(store).build();
971
972            let Err(err) = result else {
973                panic!("expected error for empty objects");
974            };
975            assert!(
976                matches!(
977                    err,
978                    ForceSyncError::MissingConfiguration {
979                        field: "object sync"
980                    }
981                ),
982                "expected MissingConfiguration for object sync, got: {err}"
983            );
984        }
985    }
986}