Skip to main content

force_sync/
runtime.rs

1//! Narrow runtime orchestration for the v0.1 vertical slice.
2
3use std::{collections::HashMap, time::Duration};
4
5use chrono::{DateTime, Utc};
6use force::{auth::Authenticator, client::ForceClient};
7use serde_json::Value;
8
9use crate::{
10    apply::{
11        postgres::project_sync_link,
12        salesforce::{ApplyError, SalesforceApplier},
13    },
14    capture,
15    config::ObjectSync,
16    error::ForceSyncError,
17    identity::SyncKey,
18    model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
19    plan::{ApplyLane, PlannerContext, plan_change},
20    reconcile,
21    store::pg::{LeasedTask, PgStore, SyncConflict},
22};
23
24struct ApplyTaskContext {
25    _journal_id: i64,
26    envelope: ChangeEnvelope,
27    current_payload: Option<Value>,
28}
29
30/// Runtime sync engine for the explicit Postgres-to-Salesforce vertical slice.
31#[derive(Debug, Clone)]
32pub struct SyncEngine<A: Authenticator> {
33    store: PgStore,
34    salesforce: SalesforceApplier<A>,
35    objects: HashMap<String, ObjectSync>,
36    capture_batch_size: i64,
37    capture_priority: i32,
38    apply_batch_size: i64,
39    reconcile_batch_size: i64,
40    lease_for: Duration,
41    worker_id: String,
42}
43
44/// Builder for [`SyncEngine`].
45#[derive(Debug)]
46pub struct SyncEngineBuilder<A: Authenticator> {
47    salesforce_client: ForceClient<A>,
48    postgres: Option<PgStore>,
49    objects: Vec<ObjectSync>,
50    capture_batch_size: i64,
51    capture_priority: i32,
52    apply_batch_size: i64,
53    reconcile_batch_size: i64,
54    lease_for: Duration,
55    worker_id: String,
56}
57
58impl<A: Authenticator> SyncEngine<A> {
59    /// Creates a builder for a runtime using the given Salesforce client.
60    #[must_use]
61    pub fn builder(salesforce_client: ForceClient<A>) -> SyncEngineBuilder<A> {
62        SyncEngineBuilder::new(salesforce_client)
63    }
64
65    /// Captures a single configured batch from the Postgres outbox.
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the outbox capture fails.
70    pub async fn run_capture_postgres_once(&self) -> Result<usize, ForceSyncError> {
71        capture::postgres::capture_batch(
72            &self.store,
73            self.capture_batch_size,
74            self.capture_priority,
75        )
76        .await
77    }
78
79    /// Leases and applies one configured batch of ready tasks.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if task loading or control-plane updates fail.
84    pub async fn run_apply_once(&self) -> Result<usize, ForceSyncError> {
85        let leased = self
86            .store
87            .lease_ready_tasks(&self.worker_id, self.apply_batch_size, self.lease_for)
88            .await?;
89        let batch_size = leased.len().max(1);
90        let mut applied = 0usize;
91
92        for task in leased {
93            if self.process_leased_task(&task, batch_size).await? {
94                applied += 1;
95            }
96        }
97
98        Ok(applied)
99    }
100
101    /// Runs one reconciliation pass and enqueues repair tasks for drift.
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if drift detection or task insertion fails.
106    pub async fn run_reconcile_once(&self) -> Result<usize, ForceSyncError> {
107        reconcile::run_reconcile_once(&self.store, self.reconcile_batch_size.max(1)).await
108    }
109
110    async fn process_leased_task(
111        &self,
112        task: &LeasedTask,
113        batch_size: usize,
114    ) -> Result<bool, ForceSyncError> {
115        let context = self.load_apply_task_context(task.task_id).await?;
116        let Some(context) = context else {
117            let _ = self
118                .store
119                .fail_task_for_worker(
120                    &self.worker_id,
121                    task.task_id,
122                    "missing apply task journal row",
123                )
124                .await?;
125            return Ok(false);
126        };
127
128        let object = self
129            .objects
130            .get(context.envelope.sync_key().object_name())
131            .ok_or(ForceSyncError::MissingConfiguration {
132                field: "object sync",
133            })?;
134
135        let existing_link = self
136            .store
137            .get_link(
138                context.envelope.sync_key().tenant(),
139                context.envelope.sync_key().object_name(),
140                context.envelope.sync_key().external_id(),
141            )
142            .await?;
143
144        let decision = plan_change(
145            &PlannerContext {
146                object: object.clone(),
147                current_payload: context.current_payload.clone(),
148                batch_size,
149                urgent: batch_size <= object.lane_thresholds().rest_max_batch_size(),
150                has_dependencies: false,
151            },
152            &context.envelope,
153        );
154
155        self.apply_planned_task(
156            task,
157            &context.envelope,
158            existing_link.as_ref(),
159            object,
160            decision,
161        )
162        .await
163    }
164
165    async fn apply_planned_task(
166        &self,
167        task: &LeasedTask,
168        envelope: &ChangeEnvelope,
169        existing_link: Option<&crate::store::pg::SyncLink>,
170        object: &ObjectSync,
171        decision: crate::plan::PlanDecision,
172    ) -> Result<bool, ForceSyncError> {
173        if decision.lane == ApplyLane::Conflict {
174            for field_name in &decision.conflicts {
175                let conflict = SyncConflict {
176                    tenant: envelope.sync_key().tenant().to_owned(),
177                    object_name: envelope.sync_key().object_name().to_owned(),
178                    external_id: envelope.sync_key().external_id().to_owned(),
179                    field_name: field_name.clone(),
180                    left_value: Value::Null,
181                    right_value: Value::Null,
182                    resolution: None,
183                };
184                self.store.insert_conflict(&conflict).await?;
185            }
186            let _ = self
187                .store
188                .fail_task_for_worker(
189                    &self.worker_id,
190                    task.task_id,
191                    format!("planner conflict: {}", decision.conflicts.join(",")),
192                )
193                .await?;
194            return Ok(false);
195        }
196
197        if decision.lane == ApplyLane::Noop {
198            self.store
199                .ack_task_for_worker(&self.worker_id, task.task_id)
200                .await?;
201            return Ok(false);
202        }
203
204        if should_project_locally(envelope.source(), decision.lane) {
205            return self.apply_local_task(task, envelope, existing_link).await;
206        }
207
208        match decision.lane {
209            ApplyLane::Rest => {
210                let payload = decision
211                    .payload
212                    .as_ref()
213                    .unwrap_or_else(|| envelope.payload());
214                let success = self
215                    .apply_rest_task(task, envelope, payload, existing_link, object)
216                    .await?;
217                Ok(success)
218            }
219            ApplyLane::Bulk => {
220                let payload = decision
221                    .payload
222                    .as_ref()
223                    .unwrap_or_else(|| envelope.payload());
224                let success = self
225                    .apply_bulk_task(task, envelope, payload, existing_link, object)
226                    .await?;
227                Ok(success)
228            }
229            ApplyLane::CompositeGraph => {
230                let _ = self
231                    .store
232                    .fail_task_for_worker(
233                        &self.worker_id,
234                        task.task_id,
235                        format!("unsupported runtime lane: {:?}", decision.lane),
236                    )
237                    .await?;
238                Ok(false)
239            }
240            ApplyLane::Conflict | ApplyLane::Noop => unreachable!("handled above"),
241        }
242    }
243
244    async fn apply_local_task(
245        &self,
246        task: &LeasedTask,
247        envelope: &ChangeEnvelope,
248        existing_link: Option<&crate::store::pg::SyncLink>,
249    ) -> Result<bool, ForceSyncError> {
250        let salesforce_id = local_projection_salesforce_id(existing_link, envelope)?;
251        let link = project_sync_link(
252            existing_link,
253            envelope,
254            salesforce_id.as_ref(),
255            matches!(envelope.operation(), ChangeOperation::Delete),
256        );
257        self.store.put_link(&link).await?;
258        self.store
259            .ack_task_for_worker(&self.worker_id, task.task_id)
260            .await?;
261        Ok(true)
262    }
263
264    async fn apply_bulk_task(
265        &self,
266        task: &LeasedTask,
267        envelope: &ChangeEnvelope,
268        payload: &Value,
269        existing_link: Option<&crate::store::pg::SyncLink>,
270        object: &ObjectSync,
271    ) -> Result<bool, ForceSyncError> {
272        match envelope.operation() {
273            ChangeOperation::Upsert => {
274                let Some(existing_salesforce_id) =
275                    existing_link.and_then(|link| link.salesforce_id.as_deref())
276                else {
277                    return self
278                        .apply_rest_task(task, envelope, payload, existing_link, object)
279                        .await;
280                };
281
282                let job_result = self
283                    .salesforce
284                    .apply_bulk_upsert(
285                        envelope.sync_key().object_name(),
286                        object
287                            .external_id_field()
288                            .ok_or(ForceSyncError::MissingConfiguration {
289                                field: "external_id_field",
290                            })?,
291                        1,
292                        vec![payload.clone()],
293                    )
294                    .await;
295                if let Err(error) = job_result {
296                    return self.handle_apply_error(task, error).await;
297                }
298
299                let Some(salesforce_id) =
300                    force::types::SalesforceId::new(existing_salesforce_id.to_owned()).ok()
301                else {
302                    return self
303                        .apply_rest_task(task, envelope, payload, existing_link, object)
304                        .await;
305                };
306
307                let link = project_sync_link(existing_link, envelope, Some(&salesforce_id), false);
308                self.store.put_link(&link).await?;
309                self.store
310                    .ack_task_for_worker(&self.worker_id, task.task_id)
311                    .await?;
312                Ok(true)
313            }
314            ChangeOperation::Delete => {
315                self.apply_rest_task(task, envelope, payload, existing_link, object)
316                    .await
317            }
318        }
319    }
320
321    async fn apply_rest_task(
322        &self,
323        task: &LeasedTask,
324        envelope: &ChangeEnvelope,
325        payload: &Value,
326        existing_link: Option<&crate::store::pg::SyncLink>,
327        object: &ObjectSync,
328    ) -> Result<bool, ForceSyncError> {
329        match envelope.operation() {
330            ChangeOperation::Upsert => {
331                let result = self
332                    .salesforce
333                    .apply_rest_upsert(
334                        envelope.sync_key().object_name(),
335                        object
336                            .external_id_field()
337                            .ok_or(ForceSyncError::MissingConfiguration {
338                                field: "external_id_field",
339                            })?,
340                        envelope.sync_key().external_id(),
341                        payload,
342                    )
343                    .await;
344
345                match result {
346                    Ok(result) => {
347                        let link = project_sync_link(
348                            existing_link,
349                            envelope,
350                            result.salesforce_id.as_ref(),
351                            false,
352                        );
353                        self.store.put_link(&link).await?;
354                        self.store
355                            .ack_task_for_worker(&self.worker_id, task.task_id)
356                            .await?;
357                        Ok(true)
358                    }
359                    Err(error) => self.handle_apply_error(task, error).await,
360                }
361            }
362            ChangeOperation::Delete => {
363                let Some(existing_link) = existing_link else {
364                    let _ = self
365                        .store
366                        .fail_task_for_worker(
367                            &self.worker_id,
368                            task.task_id,
369                            "missing Salesforce ID for delete",
370                        )
371                        .await?;
372                    return Ok(false);
373                };
374                let Some(salesforce_id) = existing_link.salesforce_id.as_deref() else {
375                    let _ = self
376                        .store
377                        .fail_task_for_worker(
378                            &self.worker_id,
379                            task.task_id,
380                            "missing Salesforce ID for delete",
381                        )
382                        .await?;
383                    return Ok(false);
384                };
385                let salesforce_id = force::types::SalesforceId::new(salesforce_id.to_owned())
386                    .map_err(|error| ForceSyncError::InvalidStoredValue {
387                        field: "salesforce_id",
388                        value: error.to_string(),
389                    })?;
390
391                match self
392                    .salesforce
393                    .apply_rest_delete(envelope.sync_key().object_name(), &salesforce_id)
394                    .await
395                {
396                    Ok(()) => {
397                        let link = project_sync_link(
398                            existing_link.into(),
399                            envelope,
400                            Some(&salesforce_id),
401                            true,
402                        );
403                        self.store.put_link(&link).await?;
404                        self.store
405                            .ack_task_for_worker(&self.worker_id, task.task_id)
406                            .await?;
407                        Ok(true)
408                    }
409                    Err(error) => self.handle_apply_error(task, error).await,
410                }
411            }
412        }
413    }
414
415    async fn handle_apply_error(
416        &self,
417        task: &LeasedTask,
418        error: ApplyError,
419    ) -> Result<bool, ForceSyncError> {
420        let error_message = error.to_string();
421        match error {
422            ApplyError::Retryable(_) => {
423                self.store
424                    .retry_task_for_worker(
425                        &self.worker_id,
426                        task.task_id,
427                        Utc::now() + chrono::Duration::seconds(30),
428                        error_message,
429                    )
430                    .await?;
431            }
432            ApplyError::Permanent(_) => {
433                self.store
434                    .fail_task_for_worker(&self.worker_id, task.task_id, error_message)
435                    .await?;
436            }
437        }
438
439        Ok(false)
440    }
441
442    async fn load_apply_task_context(
443        &self,
444        task_id: i64,
445    ) -> Result<Option<ApplyTaskContext>, ForceSyncError> {
446        let client = self.store.pool().get().await?;
447        let row = client
448            .query_opt(
449                "select
450                    j.journal_id,
451                    j.tenant,
452                    j.object_name,
453                    j.external_id,
454                    j.source,
455                    j.source_cursor,
456                    j.observed_at,
457                    j.operation,
458                    j.payload::text as payload_json,
459                    applied.payload::text as current_payload_json
460                 from sync_task t
461                 join sync_journal j
462                   on (t.payload->>'journal_id')::bigint = j.journal_id
463                 left join sync_link link
464                   on link.tenant = j.tenant
465                  and link.object_name = j.object_name
466                  and link.external_id = j.external_id
467                 left join lateral (
468                    select payload
469                    from sync_journal applied
470                    where applied.tenant = j.tenant
471                      and applied.object_name = j.object_name
472                      and applied.external_id = j.external_id
473                      and link.last_payload_hash is not null
474                      and applied.payload_hash = link.last_payload_hash
475                    order by applied.journal_id desc
476                    limit 1
477                 ) applied on true
478                 where t.task_id = $1",
479                &[&task_id],
480            )
481            .await?;
482
483        row.as_ref().map(build_apply_task_context).transpose()
484    }
485}
486
487impl<A: Authenticator> SyncEngineBuilder<A> {
488    fn new(salesforce_client: ForceClient<A>) -> Self {
489        Self {
490            salesforce_client,
491            postgres: None,
492            objects: Vec::new(),
493            capture_batch_size: 100,
494            capture_priority: 0,
495            apply_batch_size: 1,
496            reconcile_batch_size: 100,
497            lease_for: Duration::from_secs(30),
498            worker_id: "force-sync-apply".to_owned(),
499        }
500    }
501
502    /// Sets the Postgres control-plane store.
503    #[must_use]
504    pub fn postgres(mut self, postgres: PgStore) -> Self {
505        self.postgres = Some(postgres);
506        self
507    }
508
509    /// Adds an object sync definition.
510    #[must_use]
511    pub fn object(mut self, object: ObjectSync) -> Self {
512        self.objects.push(object);
513        self
514    }
515
516    /// Sets the number of records scanned per reconciliation pass.
517    #[must_use]
518    pub const fn reconcile_batch_size(mut self, reconcile_batch_size: i64) -> Self {
519        self.reconcile_batch_size = reconcile_batch_size;
520        self
521    }
522
523    /// Sets the number of tasks leased for each apply pass.
524    #[must_use]
525    pub const fn apply_batch_size(mut self, apply_batch_size: i64) -> Self {
526        self.apply_batch_size = apply_batch_size;
527        self
528    }
529
530    /// Builds the runtime engine.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if required runtime configuration is missing.
535    pub fn build(self) -> Result<SyncEngine<A>, ForceSyncError> {
536        let store = self
537            .postgres
538            .ok_or(ForceSyncError::MissingConfiguration { field: "postgres" })?;
539        if self.objects.is_empty() {
540            return Err(ForceSyncError::MissingConfiguration {
541                field: "object sync",
542            });
543        }
544
545        Ok(SyncEngine {
546            store,
547            salesforce: SalesforceApplier::new(self.salesforce_client),
548            objects: self
549                .objects
550                .into_iter()
551                .map(|object| (object.object_name().to_owned(), object))
552                .collect(),
553            capture_batch_size: self.capture_batch_size,
554            capture_priority: self.capture_priority,
555            apply_batch_size: self.apply_batch_size,
556            reconcile_batch_size: self.reconcile_batch_size,
557            lease_for: self.lease_for,
558            worker_id: self.worker_id,
559        })
560    }
561}
562
563fn build_apply_task_context(row: &tokio_postgres::Row) -> Result<ApplyTaskContext, ForceSyncError> {
564    let journal_id: i64 = row.get("journal_id");
565    let tenant: String = row.get("tenant");
566    let object_name: String = row.get("object_name");
567    let external_id: String = row.get("external_id");
568    let source: String = row.get("source");
569    let source_cursor: String = row.get("source_cursor");
570    let observed_at: DateTime<Utc> = row.get("observed_at");
571    let operation: String = row.get("operation");
572    let payload_json: String = row.get("payload_json");
573    let current_payload_json: Option<String> = row.get("current_payload_json");
574
575    let sync_key = SyncKey::new(tenant, object_name, external_id)?;
576    let payload = serde_json::from_str(&payload_json)?;
577    let current_payload = match current_payload_json {
578        Some(current_payload_json) => Some(serde_json::from_str(&current_payload_json)?),
579        None => None,
580    };
581    let envelope = ChangeEnvelope::new(
582        sync_key,
583        parse_source_system(&source)?,
584        parse_change_operation(&operation)?,
585        observed_at,
586        payload,
587    )
588    .with_cursor(parse_source_cursor(&source_cursor)?);
589
590    Ok(ApplyTaskContext {
591        _journal_id: journal_id,
592        envelope,
593        current_payload,
594    })
595}
596
597fn should_project_locally(source: SourceSystem, lane: ApplyLane) -> bool {
598    source == SourceSystem::Salesforce && !matches!(lane, ApplyLane::Noop | ApplyLane::Conflict)
599}
600
601fn local_projection_salesforce_id(
602    existing_link: Option<&crate::store::pg::SyncLink>,
603    envelope: &ChangeEnvelope,
604) -> Result<Option<force::types::SalesforceId>, ForceSyncError> {
605    if let Some(existing_salesforce_id) =
606        existing_link.and_then(|link| link.salesforce_id.as_deref())
607    {
608        return force::types::SalesforceId::new(existing_salesforce_id.to_owned())
609            .map(Some)
610            .map_err(|error| ForceSyncError::InvalidStoredValue {
611                field: "salesforce_id",
612                value: error.to_string(),
613            });
614    }
615
616    let Some(payload_salesforce_id) = envelope.payload().get("Id").and_then(Value::as_str) else {
617        return Ok(None);
618    };
619
620    force::types::SalesforceId::new(payload_salesforce_id.to_owned())
621        .map(Some)
622        .map_err(|error| ForceSyncError::InvalidStoredValue {
623            field: "payload.Id",
624            value: error.to_string(),
625        })
626}
627
628fn parse_source_system(value: &str) -> Result<SourceSystem, ForceSyncError> {
629    match value {
630        "salesforce" => Ok(SourceSystem::Salesforce),
631        "postgres" => Ok(SourceSystem::Postgres),
632        _ => Err(ForceSyncError::InvalidStoredValue {
633            field: "source",
634            value: value.to_owned(),
635        }),
636    }
637}
638
639fn parse_change_operation(value: &str) -> Result<ChangeOperation, ForceSyncError> {
640    match value {
641        "upsert" => Ok(ChangeOperation::Upsert),
642        "delete" => Ok(ChangeOperation::Delete),
643        _ => Err(ForceSyncError::InvalidStoredValue {
644            field: "operation",
645            value: value.to_owned(),
646        }),
647    }
648}
649
650fn parse_source_cursor(value: &str) -> Result<SourceCursor, ForceSyncError> {
651    if let Some(replay_id) = value.strip_prefix("salesforce-replay-id:") {
652        let replay_id =
653            replay_id
654                .parse::<i64>()
655                .map_err(|_| ForceSyncError::InvalidStoredValue {
656                    field: "source_cursor",
657                    value: value.to_owned(),
658                })?;
659        return Ok(SourceCursor::SalesforceReplayId(replay_id));
660    }
661
662    if let Some(lsn) = value.strip_prefix("postgres-lsn:") {
663        return Ok(SourceCursor::PostgresLsn(lsn.to_owned()));
664    }
665
666    if let Some(snapshot) = value.strip_prefix("snapshot:") {
667        return Ok(SourceCursor::Snapshot(snapshot.to_owned()));
668    }
669
670    Err(ForceSyncError::InvalidStoredValue {
671        field: "source_cursor",
672        value: value.to_owned(),
673    })
674}
675
676#[cfg(test)]
677mod tests {
678    use chrono::Utc;
679    use serde_json::json;
680
681    use super::{
682        local_projection_salesforce_id, parse_change_operation, parse_source_cursor,
683        parse_source_system, should_project_locally,
684    };
685    use crate::{
686        identity::SyncKey,
687        model::{ChangeEnvelope, ChangeOperation, SourceCursor, SourceSystem},
688        plan::ApplyLane,
689        store::pg::SyncLink,
690    };
691
692    fn envelope(payload: serde_json::Value) -> ChangeEnvelope {
693        ChangeEnvelope::new(
694            SyncKey::new("tenant", "Account", "external-1")
695                .unwrap_or_else(|error| panic!("unexpected sync key construction error: {error}")),
696            SourceSystem::Salesforce,
697            ChangeOperation::Upsert,
698            Utc::now(),
699            payload,
700        )
701    }
702
703    #[test]
704    fn should_not_project_local_noops_back_to_salesforce() {
705        assert!(!should_project_locally(
706            SourceSystem::Salesforce,
707            ApplyLane::Noop
708        ));
709        assert!(!should_project_locally(
710            SourceSystem::Postgres,
711            ApplyLane::Rest
712        ));
713        assert!(should_project_locally(
714            SourceSystem::Salesforce,
715            ApplyLane::Rest
716        ));
717    }
718
719    #[test]
720    fn local_projection_salesforce_id_uses_payload_id_when_link_is_missing() {
721        let envelope = envelope(json!({
722            "Id": "001000000000009AAA",
723            "Name": "Incoming Salesforce"
724        }));
725
726        let salesforce_id = local_projection_salesforce_id(None, &envelope)
727            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
728
729        assert_eq!(
730            salesforce_id
731                .as_ref()
732                .map(force::types::SalesforceId::as_str),
733            Some("001000000000009AAA")
734        );
735    }
736
737    #[test]
738    fn local_projection_salesforce_id_prefers_existing_link() {
739        let envelope = envelope(json!({
740            "Id": "001000000000009AAA",
741            "Name": "Incoming Salesforce"
742        }));
743        let existing_link = SyncLink {
744            tenant: "tenant".to_owned(),
745            object_name: "Account".to_owned(),
746            external_id: "external-1".to_owned(),
747            salesforce_id: Some("001000000000001AAA".to_owned()),
748            postgres_id: None,
749            last_source: Some("postgres".to_owned()),
750            last_source_cursor: Some("postgres-lsn:1".to_owned()),
751            last_payload_hash: None,
752            tombstone: false,
753        };
754
755        let salesforce_id = local_projection_salesforce_id(Some(&existing_link), &envelope)
756            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
757
758        assert_eq!(
759            salesforce_id
760                .as_ref()
761                .map(force::types::SalesforceId::as_str),
762            Some("001000000000001AAA")
763        );
764    }
765
766    #[test]
767    fn local_projection_returns_none_when_no_link_and_no_payload_id() {
768        let envelope = envelope(json!({"Name": "No Id Field"}));
769        let salesforce_id = local_projection_salesforce_id(None, &envelope)
770            .unwrap_or_else(|error| panic!("unexpected projection error: {error}"));
771        assert!(salesforce_id.is_none());
772    }
773
774    // -- parse_source_system tests --
775
776    #[test]
777    fn parse_source_system_salesforce() {
778        assert_eq!(
779            parse_source_system("salesforce").unwrap_or_else(|e| panic!("unexpected error: {e}")),
780            SourceSystem::Salesforce,
781        );
782    }
783
784    #[test]
785    fn parse_source_system_postgres() {
786        assert_eq!(
787            parse_source_system("postgres").unwrap_or_else(|e| panic!("unexpected error: {e}")),
788            SourceSystem::Postgres,
789        );
790    }
791
792    #[test]
793    fn parse_source_system_unknown_returns_error() {
794        let Err(err) = parse_source_system("oracle") else {
795            panic!("expected error for unknown source system");
796        };
797        assert!(err.to_string().contains("oracle"));
798    }
799
800    // -- parse_change_operation tests --
801
802    #[test]
803    fn parse_change_operation_upsert() {
804        assert_eq!(
805            parse_change_operation("upsert").unwrap_or_else(|e| panic!("unexpected error: {e}")),
806            ChangeOperation::Upsert,
807        );
808    }
809
810    #[test]
811    fn parse_change_operation_delete() {
812        assert_eq!(
813            parse_change_operation("delete").unwrap_or_else(|e| panic!("unexpected error: {e}")),
814            ChangeOperation::Delete,
815        );
816    }
817
818    #[test]
819    fn parse_change_operation_unknown_returns_error() {
820        let Err(err) = parse_change_operation("insert") else {
821            panic!("expected error for unknown change operation");
822        };
823        assert!(err.to_string().contains("insert"));
824    }
825
826    // -- parse_source_cursor tests --
827
828    #[test]
829    fn parse_source_cursor_salesforce_replay_id() {
830        let cursor = parse_source_cursor("salesforce-replay-id:42")
831            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
832        assert_eq!(cursor, SourceCursor::SalesforceReplayId(42));
833    }
834
835    #[test]
836    fn parse_source_cursor_postgres_lsn() {
837        let cursor = parse_source_cursor("postgres-lsn:0/16B3748")
838            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
839        assert_eq!(cursor, SourceCursor::PostgresLsn("0/16B3748".to_owned()));
840    }
841
842    #[test]
843    fn parse_source_cursor_snapshot() {
844        let cursor = parse_source_cursor("snapshot:2024-01-01T00:00:00Z")
845            .unwrap_or_else(|e| panic!("unexpected error: {e}"));
846        assert_eq!(
847            cursor,
848            SourceCursor::Snapshot("2024-01-01T00:00:00Z".to_owned())
849        );
850    }
851
852    #[test]
853    fn parse_source_cursor_invalid_replay_id_returns_error() {
854        let Err(err) = parse_source_cursor("salesforce-replay-id:not-a-number") else {
855            panic!("expected error for invalid replay ID");
856        };
857        assert!(err.to_string().contains("source_cursor"));
858    }
859
860    #[test]
861    fn parse_source_cursor_unknown_prefix_returns_error() {
862        let Err(err) = parse_source_cursor("kafka-offset:99") else {
863            panic!("expected error for unknown cursor prefix");
864        };
865        assert!(err.to_string().contains("kafka-offset:99"));
866    }
867
868    // -- should_project_locally additional coverage --
869
870    #[test]
871    fn should_project_locally_salesforce_bulk() {
872        assert!(should_project_locally(
873            SourceSystem::Salesforce,
874            ApplyLane::Bulk
875        ));
876    }
877
878    #[test]
879    fn should_not_project_locally_salesforce_conflict() {
880        assert!(!should_project_locally(
881            SourceSystem::Salesforce,
882            ApplyLane::Conflict
883        ));
884    }
885
886    #[test]
887    fn should_not_project_locally_postgres_bulk() {
888        assert!(!should_project_locally(
889            SourceSystem::Postgres,
890            ApplyLane::Bulk
891        ));
892    }
893
894    // -- SyncEngineBuilder validation tests --
895
896    mod builder_tests {
897        use async_trait::async_trait;
898        use force::{
899            auth::{AccessToken, Authenticator, TokenResponse},
900            client::builder,
901            error::Result as ForceResult,
902        };
903
904        use crate::{config::ObjectSync, error::ForceSyncError, runtime::SyncEngine};
905
906        #[derive(Debug, Clone)]
907        struct StubAuth;
908
909        #[async_trait]
910        impl Authenticator for StubAuth {
911            async fn authenticate(&self) -> ForceResult<AccessToken> {
912                Ok(AccessToken::from_response(TokenResponse {
913                    access_token: "stub".to_owned(),
914                    instance_url: "https://stub.salesforce.com".to_owned(),
915                    token_type: "Bearer".to_owned(),
916                    issued_at: "1704067200000".to_owned(),
917                    signature: "stub".to_owned(),
918                    expires_in: Some(7200),
919                    refresh_token: None,
920                }))
921            }
922
923            async fn refresh(&self) -> ForceResult<AccessToken> {
924                self.authenticate().await
925            }
926        }
927
928        #[tokio::test]
929        async fn build_fails_without_postgres() {
930            let client = builder()
931                .authenticate(StubAuth)
932                .build()
933                .await
934                .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
935
936            let result = SyncEngine::builder(client)
937                .object(ObjectSync::new("Account").external_id("ExternalId__c"))
938                .build();
939
940            let Err(err) = result else {
941                panic!("expected error for missing postgres");
942            };
943            assert!(
944                matches!(
945                    err,
946                    ForceSyncError::MissingConfiguration { field: "postgres" }
947                ),
948                "expected MissingConfiguration for postgres, got: {err}"
949            );
950        }
951
952        #[tokio::test]
953        async fn build_fails_with_empty_objects() {
954            let client = builder()
955                .authenticate(StubAuth)
956                .build()
957                .await
958                .unwrap_or_else(|e| panic!("unexpected client build error: {e}"));
959
960            // Create a dummy PgStore -- it won't be used for queries, only for builder validation.
961            let mut config = deadpool_postgres::Config::new();
962            config.url = Some("postgresql://unused:unused@localhost:5432/unused".to_owned());
963            let pool = config
964                .create_pool(
965                    Some(deadpool_postgres::Runtime::Tokio1),
966                    tokio_postgres::NoTls,
967                )
968                .unwrap_or_else(|e| panic!("unexpected pool error: {e}"));
969            let store = crate::store::pg::PgStore::new(pool);
970
971            let result = SyncEngine::builder(client).postgres(store).build();
972
973            let Err(err) = result else {
974                panic!("expected error for empty objects");
975            };
976            assert!(
977                matches!(
978                    err,
979                    ForceSyncError::MissingConfiguration {
980                        field: "object sync"
981                    }
982                ),
983                "expected MissingConfiguration for object sync, got: {err}"
984            );
985        }
986    }
987}