Skip to main content

force_sync/
plan.rs

1//! Pure planner and merge logic for sync envelopes.
2
3use serde_json::{Map, Value};
4
5use crate::config::{ObjectSync, Owner};
6use crate::model::{ChangeEnvelope, SourceSystem, payload_hash};
7
8/// Apply lanes chosen by the planner.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ApplyLane {
11    /// No write is required because the incoming change matches current state.
12    Noop,
13    /// The change requires operator intervention or later resolution.
14    Conflict,
15    /// Apply the change through the Salesforce REST API.
16    Rest,
17    /// Apply the change through Composite Graph for dependent records.
18    CompositeGraph,
19    /// Apply the change through bulk transport.
20    Bulk,
21}
22
23/// The pure inputs required to plan a change.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct PlannerContext {
26    /// Object-level sync configuration.
27    pub object: ObjectSync,
28    /// The current canonical payload, if one is already known.
29    pub current_payload: Option<Value>,
30    /// The number of records being planned together.
31    pub batch_size: usize,
32    /// Whether the record should favor the fast path over batch economics.
33    pub urgent: bool,
34    /// Whether the change depends on related records being applied together.
35    pub has_dependencies: bool,
36}
37
38/// The final planner decision after merge and lane selection.
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct PlanDecision {
41    /// The selected apply lane.
42    pub lane: ApplyLane,
43    /// The merged payload to apply, if any.
44    pub payload: Option<Value>,
45    /// Fields that could not be resolved automatically.
46    pub conflicts: Vec<String>,
47}
48
49/// The result of merging payloads before lane selection.
50#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum MergeOutcome {
52    /// No material change was detected.
53    Noop,
54    /// The payload was merged successfully.
55    Merged(Value),
56    /// The merge could not be resolved automatically.
57    Conflict {
58        /// The fields that caused the conflict.
59        fields: Vec<String>,
60    },
61}
62
63/// Merges an incoming payload into the current payload using field ownership rules.
64#[must_use]
65pub fn merge_payload(
66    object: &ObjectSync,
67    current_payload: Option<&Value>,
68    source: SourceSystem,
69    incoming_payload: &Value,
70) -> MergeOutcome {
71    let Some(current_payload) = current_payload else {
72        return MergeOutcome::Merged(incoming_payload.clone());
73    };
74
75    if payload_hash(current_payload) == payload_hash(incoming_payload) {
76        return MergeOutcome::Noop;
77    }
78
79    match (current_payload, incoming_payload) {
80        (Value::Object(current), Value::Object(incoming)) => {
81            merge_object_payload(object, current, source, incoming)
82        }
83        _ if current_payload == incoming_payload => MergeOutcome::Noop,
84        _ => MergeOutcome::Conflict {
85            fields: vec!["<root>".to_string()],
86        },
87    }
88}
89
90/// Plans how a change should be applied after merge resolution.
91#[must_use]
92pub fn plan_change(context: &PlannerContext, envelope: &ChangeEnvelope) -> PlanDecision {
93    if context
94        .current_payload
95        .as_ref()
96        .is_some_and(|current| envelope.payload_hash_matches(current))
97    {
98        return PlanDecision {
99            lane: ApplyLane::Noop,
100            payload: None,
101            conflicts: Vec::new(),
102        };
103    }
104
105    match merge_payload(
106        &context.object,
107        context.current_payload.as_ref(),
108        envelope.source(),
109        envelope.payload(),
110    ) {
111        MergeOutcome::Noop => PlanDecision {
112            lane: ApplyLane::Noop,
113            payload: None,
114            conflicts: Vec::new(),
115        },
116        MergeOutcome::Conflict { fields } => PlanDecision {
117            lane: ApplyLane::Conflict,
118            payload: None,
119            conflicts: fields,
120        },
121        MergeOutcome::Merged(payload) => PlanDecision {
122            lane: if context
123                .current_payload
124                .as_ref()
125                .is_some_and(|current| current == &payload)
126            {
127                ApplyLane::Noop
128            } else {
129                choose_lane(context)
130            },
131            payload: if context
132                .current_payload
133                .as_ref()
134                .is_some_and(|current| current == &payload)
135            {
136                None
137            } else {
138                Some(payload)
139            },
140            conflicts: Vec::new(),
141        },
142    }
143}
144
145const fn choose_lane(context: &PlannerContext) -> ApplyLane {
146    if context.has_dependencies {
147        return ApplyLane::CompositeGraph;
148    }
149
150    if context.urgent {
151        return ApplyLane::Rest;
152    }
153
154    if context.batch_size >= context.object.lane_thresholds().bulk_min_batch_size() {
155        ApplyLane::Bulk
156    } else {
157        ApplyLane::Rest
158    }
159}
160
161fn merge_object_payload(
162    object: &ObjectSync,
163    current: &Map<String, Value>,
164    source: SourceSystem,
165    incoming: &Map<String, Value>,
166) -> MergeOutcome {
167    // ⚡ Bolt: Delay cloning `current` until an actual mutation is needed.
168    // If all incoming fields already match or are ignored, we avoid cloning the entire map.
169    let mut merged: Option<Map<String, Value>> = None;
170    let mut conflicts = Vec::new();
171
172    for (field, incoming_value) in incoming {
173        match current.get(field) {
174            Some(existing_value) if existing_value == incoming_value => {}
175            Some(_) => match object.field_owner_for(field) {
176                Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
177                    merged
178                        .get_or_insert_with(|| current.clone())
179                        .insert(field.clone(), incoming_value.clone());
180                }
181                Some(Owner::Postgres) if source == SourceSystem::Postgres => {
182                    merged
183                        .get_or_insert_with(|| current.clone())
184                        .insert(field.clone(), incoming_value.clone());
185                }
186                Some(Owner::Salesforce | Owner::Postgres) => {}
187                Some(Owner::Shared) | None => conflicts.push(field.clone()),
188            },
189            None => match object.field_owner_for(field) {
190                Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
191                    merged
192                        .get_or_insert_with(|| current.clone())
193                        .insert(field.clone(), incoming_value.clone());
194                }
195                Some(Owner::Postgres) if source == SourceSystem::Postgres => {
196                    merged
197                        .get_or_insert_with(|| current.clone())
198                        .insert(field.clone(), incoming_value.clone());
199                }
200                Some(Owner::Salesforce | Owner::Postgres) => {}
201                Some(Owner::Shared) | None => {
202                    merged
203                        .get_or_insert_with(|| current.clone())
204                        .insert(field.clone(), incoming_value.clone());
205                }
206            },
207        }
208    }
209
210    let merged = merged.unwrap_or_else(|| current.clone());
211
212    if !conflicts.is_empty() {
213        return MergeOutcome::Conflict { fields: conflicts };
214    }
215
216    MergeOutcome::Merged(Value::Object(merged))
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use crate::identity::SyncKey;
223    use crate::model::{ChangeEnvelope, ChangeOperation, SourceSystem};
224    use chrono::Utc;
225    use serde_json::json;
226
227    fn test_object_sync() -> ObjectSync {
228        ObjectSync::new("Account")
229    }
230
231    fn test_object_sync_with_ownership() -> ObjectSync {
232        ObjectSync::new("Account")
233            .field_owner("Name", Owner::Salesforce)
234            .field_owner("Billing", Owner::Postgres)
235            .field_owner("Phone", Owner::Shared)
236    }
237
238    fn test_envelope(
239        source: SourceSystem,
240        operation: ChangeOperation,
241        payload: Value,
242    ) -> ChangeEnvelope {
243        let Ok(sync_key) = SyncKey::new("tenant", "Account", "ext-001") else {
244            panic!("failed to create test sync key");
245        };
246        ChangeEnvelope::new(sync_key, source, operation, Utc::now(), payload)
247    }
248
249    fn test_context(object: &ObjectSync, current_payload: Option<Value>) -> PlannerContext {
250        PlannerContext {
251            object: object.clone(),
252            current_payload,
253            batch_size: 1,
254            urgent: false,
255            has_dependencies: false,
256        }
257    }
258
259    // ── merge_payload ─────────────────────────────────────────────────
260
261    #[test]
262    fn merge_payload_no_current_returns_incoming_as_merged() {
263        let object = test_object_sync();
264        let incoming = json!({"Name": "Acme"});
265
266        let result = merge_payload(&object, None, SourceSystem::Salesforce, &incoming);
267        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
268    }
269
270    #[test]
271    fn merge_payload_identical_payloads_returns_noop() {
272        let object = test_object_sync();
273        let current = json!({"Name": "Acme"});
274        let incoming = json!({"Name": "Acme"});
275
276        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
277        assert_eq!(result, MergeOutcome::Noop);
278    }
279
280    #[test]
281    fn merge_payload_non_object_mismatch_returns_conflict() {
282        let object = test_object_sync();
283        let current = json!("string_a");
284        let incoming = json!("string_b");
285
286        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
287        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["<root>"]));
288    }
289
290    #[test]
291    fn merge_payload_non_object_identical_returns_noop() {
292        let object = test_object_sync();
293        let current = json!(42);
294        let incoming = json!(42);
295
296        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
297        assert_eq!(result, MergeOutcome::Noop);
298    }
299
300    #[test]
301    fn merge_payload_new_field_no_ownership_is_added() {
302        let object = test_object_sync();
303        let current = json!({"Name": "Acme"});
304        let incoming = json!({"Name": "Acme", "Phone": "555-1234"});
305
306        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
307        assert_eq!(
308            result,
309            MergeOutcome::Merged(json!({"Name": "Acme", "Phone": "555-1234"}))
310        );
311    }
312
313    #[test]
314    fn merge_payload_salesforce_owned_field_accepted_from_salesforce() {
315        let object = test_object_sync_with_ownership();
316        let current = json!({"Name": "Old"});
317        let incoming = json!({"Name": "New"});
318
319        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
320        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "New"})));
321    }
322
323    #[test]
324    fn merge_payload_salesforce_owned_field_rejected_from_postgres() {
325        let object = test_object_sync_with_ownership();
326        let current = json!({"Name": "Acme"});
327        let incoming = json!({"Name": "Changed"});
328
329        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
330        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
331    }
332
333    #[test]
334    fn merge_payload_shared_field_conflict_on_change() {
335        let object = test_object_sync_with_ownership();
336        let current = json!({"Phone": "111"});
337        let incoming = json!({"Phone": "222"});
338
339        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
340        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
341    }
342
343    #[test]
344    fn merge_payload_null_field_in_incoming_with_ownership() {
345        let object = ObjectSync::new("Account")
346            .field_owner("Name", Owner::Salesforce)
347            .field_owner("Phone", Owner::Salesforce);
348        let current = json!({"Name": "Acme", "Phone": "555"});
349        let incoming = json!({"Name": "Acme", "Phone": null});
350
351        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
352        assert_eq!(
353            result,
354            MergeOutcome::Merged(json!({"Name": "Acme", "Phone": null}))
355        );
356    }
357
358    #[test]
359    fn merge_payload_empty_incoming_object_returns_current() {
360        let object = test_object_sync();
361        let current = json!({"Name": "Acme"});
362        let incoming = json!({});
363
364        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
365        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
366    }
367
368    #[test]
369    fn merge_payload_postgres_owned_field_accepted_from_postgres() {
370        let object = test_object_sync_with_ownership();
371        let current = json!({"Billing": "Old Address"});
372        let incoming = json!({"Billing": "New Address"});
373
374        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
375        assert_eq!(
376            result,
377            MergeOutcome::Merged(json!({"Billing": "New Address"}))
378        );
379    }
380
381    #[test]
382    fn merge_payload_postgres_owned_field_rejected_from_salesforce() {
383        let object = test_object_sync_with_ownership();
384        let current = json!({"Billing": "Original"});
385        let incoming = json!({"Billing": "Overwritten"});
386
387        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
388        assert_eq!(result, MergeOutcome::Merged(json!({"Billing": "Original"})));
389    }
390
391    #[test]
392    fn merge_payload_new_field_salesforce_owned_added_from_salesforce() {
393        let object = test_object_sync_with_ownership();
394        let current = json!({});
395        let incoming = json!({"Name": "Added"});
396
397        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
398        assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Added"})));
399    }
400
401    #[test]
402    fn merge_payload_new_field_salesforce_owned_ignored_from_postgres() {
403        let object = test_object_sync_with_ownership();
404        let current = json!({});
405        let incoming = json!({"Name": "ShouldBeIgnored"});
406
407        let result = merge_payload(&object, Some(&current), SourceSystem::Postgres, &incoming);
408        assert_eq!(result, MergeOutcome::Merged(json!({})));
409    }
410
411    #[test]
412    fn merge_payload_new_field_postgres_owned_ignored_from_salesforce() {
413        let object = test_object_sync_with_ownership();
414        let current = json!({});
415        let incoming = json!({"Billing": "ShouldBeIgnored"});
416
417        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
418        assert_eq!(result, MergeOutcome::Merged(json!({})));
419    }
420
421    #[test]
422    fn merge_payload_new_shared_field_is_added() {
423        let object = test_object_sync_with_ownership();
424        let current = json!({});
425        let incoming = json!({"Phone": "555-0100"});
426
427        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
428        assert_eq!(result, MergeOutcome::Merged(json!({"Phone": "555-0100"})));
429    }
430
431    #[test]
432    fn merge_payload_multiple_conflicts_collected() {
433        let object = ObjectSync::new("Account")
434            .field_owner("A", Owner::Shared)
435            .field_owner("B", Owner::Shared);
436        let current = json!({"A": 1, "B": 2});
437        let incoming = json!({"A": 10, "B": 20});
438
439        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
440        match result {
441            MergeOutcome::Conflict { fields } => {
442                assert!(fields.contains(&"A".to_string()));
443                assert!(fields.contains(&"B".to_string()));
444                assert_eq!(fields.len(), 2);
445            }
446            other => panic!("expected Conflict, got {other:?}"),
447        }
448    }
449
450    #[test]
451    fn merge_payload_null_field_in_incoming_no_ownership() {
452        let object = test_object_sync();
453        let current = json!({"Name": "Acme", "Phone": "555"});
454        let incoming = json!({"Name": "Acme", "Phone": null});
455
456        let result = merge_payload(&object, Some(&current), SourceSystem::Salesforce, &incoming);
457        assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
458    }
459
460    // ── plan_change ───────────────────────────────────────────────────
461
462    #[test]
463    fn plan_change_delete_no_current_routes_to_rest() {
464        let object = test_object_sync();
465        let envelope = test_envelope(SourceSystem::Salesforce, ChangeOperation::Delete, json!({}));
466        let ctx = test_context(&object, None);
467
468        let decision = plan_change(&ctx, &envelope);
469        assert_eq!(decision.lane, ApplyLane::Rest);
470    }
471
472    #[test]
473    fn plan_change_identical_payload_returns_noop() {
474        let object = test_object_sync();
475        let payload = json!({"Name": "Acme"});
476        let envelope = test_envelope(
477            SourceSystem::Salesforce,
478            ChangeOperation::Upsert,
479            payload.clone(),
480        );
481        let ctx = test_context(&object, Some(payload));
482
483        let decision = plan_change(&ctx, &envelope);
484        assert_eq!(decision.lane, ApplyLane::Noop);
485        assert!(decision.payload.is_none());
486        assert!(decision.conflicts.is_empty());
487    }
488
489    #[test]
490    fn plan_change_new_record_routes_to_rest() {
491        let object = test_object_sync();
492        let envelope = test_envelope(
493            SourceSystem::Postgres,
494            ChangeOperation::Upsert,
495            json!({"Name": "New"}),
496        );
497        let ctx = test_context(&object, None);
498
499        let decision = plan_change(&ctx, &envelope);
500        assert_eq!(decision.lane, ApplyLane::Rest);
501        assert_eq!(decision.payload, Some(json!({"Name": "New"})));
502    }
503
504    #[test]
505    fn plan_change_urgent_routes_to_rest() {
506        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
507        let envelope = test_envelope(
508            SourceSystem::Postgres,
509            ChangeOperation::Upsert,
510            json!({"Name": "Updated"}),
511        );
512        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
513        ctx.urgent = true;
514
515        let decision = plan_change(&ctx, &envelope);
516        assert_eq!(decision.lane, ApplyLane::Rest);
517    }
518
519    #[test]
520    fn plan_change_with_dependencies_routes_to_composite_graph() {
521        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
522        let envelope = test_envelope(
523            SourceSystem::Postgres,
524            ChangeOperation::Upsert,
525            json!({"Name": "Updated"}),
526        );
527        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
528        ctx.has_dependencies = true;
529
530        let decision = plan_change(&ctx, &envelope);
531        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
532    }
533
534    #[test]
535    fn plan_change_large_batch_routes_to_bulk() {
536        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
537        let envelope = test_envelope(
538            SourceSystem::Postgres,
539            ChangeOperation::Upsert,
540            json!({"Name": "Updated"}),
541        );
542        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
543        ctx.batch_size = 1000;
544
545        let decision = plan_change(&ctx, &envelope);
546        assert_eq!(decision.lane, ApplyLane::Bulk);
547    }
548
549    #[test]
550    fn plan_change_conflict_returns_conflict_lane() {
551        let object = test_object_sync_with_ownership();
552        let envelope = test_envelope(
553            SourceSystem::Salesforce,
554            ChangeOperation::Upsert,
555            json!({"Phone": "new-value"}),
556        );
557        let ctx = test_context(&object, Some(json!({"Phone": "old-value"})));
558
559        let decision = plan_change(&ctx, &envelope);
560        assert_eq!(decision.lane, ApplyLane::Conflict);
561        assert!(decision.payload.is_none());
562        assert!(!decision.conflicts.is_empty());
563    }
564
565    #[test]
566    fn plan_change_merged_equals_current_returns_noop() {
567        let object = test_object_sync_with_ownership();
568        let current = json!({"Name": "Acme", "Billing": "123 Main"});
569        let incoming = json!({"Name": "Changed"});
570        let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
571        let ctx = test_context(&object, Some(current));
572
573        let decision = plan_change(&ctx, &envelope);
574        assert_eq!(decision.lane, ApplyLane::Noop);
575        assert!(decision.payload.is_none());
576    }
577
578    #[test]
579    fn plan_change_batch_below_bulk_threshold_routes_to_rest() {
580        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
581        let envelope = test_envelope(
582            SourceSystem::Postgres,
583            ChangeOperation::Upsert,
584            json!({"Name": "Updated"}),
585        );
586        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
587        ctx.batch_size = 499;
588
589        let decision = plan_change(&ctx, &envelope);
590        assert_eq!(decision.lane, ApplyLane::Rest);
591    }
592
593    #[test]
594    fn plan_change_batch_at_bulk_threshold_routes_to_bulk() {
595        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
596        let envelope = test_envelope(
597            SourceSystem::Postgres,
598            ChangeOperation::Upsert,
599            json!({"Name": "Updated"}),
600        );
601        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
602        ctx.batch_size = 500;
603
604        let decision = plan_change(&ctx, &envelope);
605        assert_eq!(decision.lane, ApplyLane::Bulk);
606    }
607
608    #[test]
609    fn plan_change_dependencies_take_priority_over_bulk() {
610        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
611        let envelope = test_envelope(
612            SourceSystem::Postgres,
613            ChangeOperation::Upsert,
614            json!({"Name": "Updated"}),
615        );
616        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
617        ctx.batch_size = 1000;
618        ctx.has_dependencies = true;
619
620        let decision = plan_change(&ctx, &envelope);
621        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
622    }
623
624    #[test]
625    fn plan_change_dependencies_take_priority_over_urgent() {
626        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
627        let envelope = test_envelope(
628            SourceSystem::Postgres,
629            ChangeOperation::Upsert,
630            json!({"Name": "Updated"}),
631        );
632        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
633        ctx.urgent = true;
634        ctx.has_dependencies = true;
635
636        let decision = plan_change(&ctx, &envelope);
637        assert_eq!(decision.lane, ApplyLane::CompositeGraph);
638    }
639
640    #[test]
641    fn plan_change_urgent_takes_priority_over_bulk() {
642        let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
643        let envelope = test_envelope(
644            SourceSystem::Postgres,
645            ChangeOperation::Upsert,
646            json!({"Name": "Updated"}),
647        );
648        let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
649        ctx.batch_size = 1000;
650        ctx.urgent = true;
651
652        let decision = plan_change(&ctx, &envelope);
653        assert_eq!(decision.lane, ApplyLane::Rest);
654    }
655
656    #[test]
657    fn plan_change_semantically_equal_payload_returns_noop() {
658        let object = test_object_sync();
659        let current = json!({"A": 1, "B": 2});
660        let incoming = json!({"B": 2, "A": 1});
661        let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
662        let ctx = test_context(&object, Some(current));
663
664        let decision = plan_change(&ctx, &envelope);
665        assert_eq!(decision.lane, ApplyLane::Noop);
666    }
667}