1use serde_json::{Map, Value};
4
5use crate::config::{ObjectSync, Owner};
6use crate::model::{ChangeEnvelope, SourceSystem, payload_hash};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ApplyLane {
11 Noop,
13 Conflict,
15 Rest,
17 CompositeGraph,
19 Bulk,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct PlannerContext {
26 pub object: ObjectSync,
28 pub current_payload: Option<Value>,
30 pub batch_size: usize,
32 pub urgent: bool,
34 pub has_dependencies: bool,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct PlanDecision {
41 pub lane: ApplyLane,
43 pub payload: Option<Value>,
45 pub conflicts: Vec<String>,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq)]
51pub enum MergeOutcome {
52 Noop,
54 Merged(Value),
56 Conflict {
58 fields: Vec<String>,
60 },
61}
62
63#[must_use]
65pub fn merge_payload(
66 object: &ObjectSync,
67 current_payload: Option<&Value>,
68 source: SourceSystem,
69 incoming_payload: &Value,
70) -> MergeOutcome {
71 let Some(current_payload) = current_payload else {
72 return MergeOutcome::Merged(incoming_payload.clone());
73 };
74
75 if payload_hash(current_payload) == payload_hash(incoming_payload) {
76 return MergeOutcome::Noop;
77 }
78
79 match (current_payload, incoming_payload) {
80 (Value::Object(current), Value::Object(incoming)) => {
81 merge_object_payload(object, current, source, incoming)
82 }
83 _ if current_payload == incoming_payload => MergeOutcome::Noop,
84 _ => MergeOutcome::Conflict {
85 fields: vec!["<root>".to_string()],
86 },
87 }
88}
89
90#[must_use]
92pub fn plan_change(context: &PlannerContext, envelope: &ChangeEnvelope) -> PlanDecision {
93 if context
94 .current_payload
95 .as_ref()
96 .is_some_and(|current| envelope.payload_hash_matches(current))
97 {
98 return PlanDecision {
99 lane: ApplyLane::Noop,
100 payload: None,
101 conflicts: Vec::new(),
102 };
103 }
104
105 match merge_payload(
106 &context.object,
107 context.current_payload.as_ref(),
108 envelope.source(),
109 envelope.payload(),
110 ) {
111 MergeOutcome::Noop => PlanDecision {
112 lane: ApplyLane::Noop,
113 payload: None,
114 conflicts: Vec::new(),
115 },
116 MergeOutcome::Conflict { fields } => PlanDecision {
117 lane: ApplyLane::Conflict,
118 payload: None,
119 conflicts: fields,
120 },
121 MergeOutcome::Merged(payload) => PlanDecision {
122 lane: if context
123 .current_payload
124 .as_ref()
125 .is_some_and(|current| current == &payload)
126 {
127 ApplyLane::Noop
128 } else {
129 choose_lane(context)
130 },
131 payload: if context
132 .current_payload
133 .as_ref()
134 .is_some_and(|current| current == &payload)
135 {
136 None
137 } else {
138 Some(payload)
139 },
140 conflicts: Vec::new(),
141 },
142 }
143}
144
145const fn choose_lane(context: &PlannerContext) -> ApplyLane {
146 if context.has_dependencies {
147 return ApplyLane::CompositeGraph;
148 }
149
150 if context.urgent {
151 return ApplyLane::Rest;
152 }
153
154 if context.batch_size >= context.object.lane_thresholds().bulk_min_batch_size() {
155 ApplyLane::Bulk
156 } else {
157 ApplyLane::Rest
158 }
159}
160
161fn merge_object_payload(
162 object: &ObjectSync,
163 current: &Map<String, Value>,
164 source: SourceSystem,
165 incoming: &Map<String, Value>,
166) -> MergeOutcome {
167 let mut merged = current.clone();
168 let mut conflicts = Vec::new();
169
170 for (field, incoming_value) in incoming {
171 match merged.get(field) {
172 Some(existing_value) if existing_value == incoming_value => {}
173 Some(_existing_value) => match object.field_owner_for(field) {
174 Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
175 merged.insert(field.clone(), incoming_value.clone());
176 }
177 Some(Owner::Postgres) if source == SourceSystem::Postgres => {
178 merged.insert(field.clone(), incoming_value.clone());
179 }
180 Some(Owner::Salesforce | Owner::Postgres) => {}
181 Some(Owner::Shared) | None => conflicts.push(field.clone()),
182 },
183 None => match object.field_owner_for(field) {
184 Some(Owner::Salesforce) if source == SourceSystem::Salesforce => {
185 merged.insert(field.clone(), incoming_value.clone());
186 }
187 Some(Owner::Postgres) if source == SourceSystem::Postgres => {
188 merged.insert(field.clone(), incoming_value.clone());
189 }
190 Some(Owner::Salesforce | Owner::Postgres) => {}
191 Some(Owner::Shared) | None => {
192 merged.insert(field.clone(), incoming_value.clone());
193 }
194 },
195 }
196 }
197
198 if !conflicts.is_empty() {
199 return MergeOutcome::Conflict { fields: conflicts };
200 }
201
202 MergeOutcome::Merged(Value::Object(merged))
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::identity::SyncKey;
209 use crate::model::{ChangeEnvelope, ChangeOperation, SourceSystem};
210 use chrono::Utc;
211 use serde_json::json;
212
213 fn test_object_sync() -> ObjectSync {
214 ObjectSync::new("Account")
215 }
216
217 fn test_object_sync_with_ownership() -> ObjectSync {
218 ObjectSync::new("Account")
219 .field_owner("Name", Owner::Salesforce)
220 .field_owner("Billing", Owner::Postgres)
221 .field_owner("Phone", Owner::Shared)
222 }
223
224 fn test_envelope(
225 source: SourceSystem,
226 operation: ChangeOperation,
227 payload: Value,
228 ) -> ChangeEnvelope {
229 let Ok(sync_key) = SyncKey::new("tenant", "Account", "ext-001") else {
230 panic!("failed to create test sync key");
231 };
232 ChangeEnvelope::new(sync_key, source, operation, Utc::now(), payload)
233 }
234
235 fn test_context(object: &ObjectSync, current_payload: Option<Value>) -> PlannerContext {
236 PlannerContext {
237 object: object.clone(),
238 current_payload,
239 batch_size: 1,
240 urgent: false,
241 has_dependencies: false,
242 }
243 }
244
245 #[test]
248 fn merge_payload_no_current_returns_incoming_as_merged() {
249 let object = test_object_sync();
250 let incoming = json!({"Name": "Acme"});
251
252 let result = merge_payload(&object, None, SourceSystem::Salesforce, &incoming);
253 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
254 }
255
256 #[test]
257 fn merge_payload_identical_payloads_returns_noop() {
258 let object = test_object_sync();
259 let current = json!({"Name": "Acme"});
260 let incoming = json!({"Name": "Acme"});
261
262 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
263 assert_eq!(result, MergeOutcome::Noop);
264 }
265
266 #[test]
267 fn merge_payload_non_object_mismatch_returns_conflict() {
268 let object = test_object_sync();
269 let current = json!("string_a");
270 let incoming = json!("string_b");
271
272 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
273 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["<root>"]));
274 }
275
276 #[test]
277 fn merge_payload_non_object_identical_returns_noop() {
278 let object = test_object_sync();
279 let current = json!(42);
280 let incoming = json!(42);
281
282 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
283 assert_eq!(result, MergeOutcome::Noop);
284 }
285
286 #[test]
287 fn merge_payload_new_field_no_ownership_is_added() {
288 let object = test_object_sync();
289 let current = json!({"Name": "Acme"});
290 let incoming = json!({"Name": "Acme", "Phone": "555-1234"});
291
292 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
293 assert_eq!(
294 result,
295 MergeOutcome::Merged(json!({"Name": "Acme", "Phone": "555-1234"}))
296 );
297 }
298
299 #[test]
300 fn merge_payload_salesforce_owned_field_accepted_from_salesforce() {
301 let object = test_object_sync_with_ownership();
302 let current = json!({"Name": "Old"});
303 let incoming = json!({"Name": "New"});
304
305 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
306 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "New"})));
307 }
308
309 #[test]
310 fn merge_payload_salesforce_owned_field_rejected_from_postgres() {
311 let object = test_object_sync_with_ownership();
312 let current = json!({"Name": "Acme"});
313 let incoming = json!({"Name": "Changed"});
314
315 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
316 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
317 }
318
319 #[test]
320 fn merge_payload_shared_field_conflict_on_change() {
321 let object = test_object_sync_with_ownership();
322 let current = json!({"Phone": "111"});
323 let incoming = json!({"Phone": "222"});
324
325 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
326 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
327 }
328
329 #[test]
330 fn merge_payload_null_field_in_incoming_with_ownership() {
331 let object = ObjectSync::new("Account")
332 .field_owner("Name", Owner::Salesforce)
333 .field_owner("Phone", Owner::Salesforce);
334 let current = json!({"Name": "Acme", "Phone": "555"});
335 let incoming = json!({"Name": "Acme", "Phone": null});
336
337 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
338 assert_eq!(
339 result,
340 MergeOutcome::Merged(json!({"Name": "Acme", "Phone": null}))
341 );
342 }
343
344 #[test]
345 fn merge_payload_empty_incoming_object_returns_current() {
346 let object = test_object_sync();
347 let current = json!({"Name": "Acme"});
348 let incoming = json!({});
349
350 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
351 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Acme"})));
352 }
353
354 #[test]
355 fn merge_payload_postgres_owned_field_accepted_from_postgres() {
356 let object = test_object_sync_with_ownership();
357 let current = json!({"Billing": "Old Address"});
358 let incoming = json!({"Billing": "New Address"});
359
360 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
361 assert_eq!(
362 result,
363 MergeOutcome::Merged(json!({"Billing": "New Address"}))
364 );
365 }
366
367 #[test]
368 fn merge_payload_postgres_owned_field_rejected_from_salesforce() {
369 let object = test_object_sync_with_ownership();
370 let current = json!({"Billing": "Original"});
371 let incoming = json!({"Billing": "Overwritten"});
372
373 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
374 assert_eq!(result, MergeOutcome::Merged(json!({"Billing": "Original"})));
375 }
376
377 #[test]
378 fn merge_payload_new_field_salesforce_owned_added_from_salesforce() {
379 let object = test_object_sync_with_ownership();
380 let current = json!({});
381 let incoming = json!({"Name": "Added"});
382
383 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
384 assert_eq!(result, MergeOutcome::Merged(json!({"Name": "Added"})));
385 }
386
387 #[test]
388 fn merge_payload_new_field_salesforce_owned_ignored_from_postgres() {
389 let object = test_object_sync_with_ownership();
390 let current = json!({});
391 let incoming = json!({"Name": "ShouldBeIgnored"});
392
393 let result = merge_payload(&object, Some(¤t), SourceSystem::Postgres, &incoming);
394 assert_eq!(result, MergeOutcome::Merged(json!({})));
395 }
396
397 #[test]
398 fn merge_payload_new_field_postgres_owned_ignored_from_salesforce() {
399 let object = test_object_sync_with_ownership();
400 let current = json!({});
401 let incoming = json!({"Billing": "ShouldBeIgnored"});
402
403 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
404 assert_eq!(result, MergeOutcome::Merged(json!({})));
405 }
406
407 #[test]
408 fn merge_payload_new_shared_field_is_added() {
409 let object = test_object_sync_with_ownership();
410 let current = json!({});
411 let incoming = json!({"Phone": "555-0100"});
412
413 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
414 assert_eq!(result, MergeOutcome::Merged(json!({"Phone": "555-0100"})));
415 }
416
417 #[test]
418 fn merge_payload_multiple_conflicts_collected() {
419 let object = ObjectSync::new("Account")
420 .field_owner("A", Owner::Shared)
421 .field_owner("B", Owner::Shared);
422 let current = json!({"A": 1, "B": 2});
423 let incoming = json!({"A": 10, "B": 20});
424
425 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
426 match result {
427 MergeOutcome::Conflict { fields } => {
428 assert!(fields.contains(&"A".to_string()));
429 assert!(fields.contains(&"B".to_string()));
430 assert_eq!(fields.len(), 2);
431 }
432 other => panic!("expected Conflict, got {other:?}"),
433 }
434 }
435
436 #[test]
437 fn merge_payload_null_field_in_incoming_no_ownership() {
438 let object = test_object_sync();
439 let current = json!({"Name": "Acme", "Phone": "555"});
440 let incoming = json!({"Name": "Acme", "Phone": null});
441
442 let result = merge_payload(&object, Some(¤t), SourceSystem::Salesforce, &incoming);
443 assert!(matches!(result, MergeOutcome::Conflict { fields } if fields == vec!["Phone"]));
444 }
445
446 #[test]
449 fn plan_change_delete_no_current_routes_to_rest() {
450 let object = test_object_sync();
451 let envelope = test_envelope(SourceSystem::Salesforce, ChangeOperation::Delete, json!({}));
452 let ctx = test_context(&object, None);
453
454 let decision = plan_change(&ctx, &envelope);
455 assert_eq!(decision.lane, ApplyLane::Rest);
456 }
457
458 #[test]
459 fn plan_change_identical_payload_returns_noop() {
460 let object = test_object_sync();
461 let payload = json!({"Name": "Acme"});
462 let envelope = test_envelope(
463 SourceSystem::Salesforce,
464 ChangeOperation::Upsert,
465 payload.clone(),
466 );
467 let ctx = test_context(&object, Some(payload));
468
469 let decision = plan_change(&ctx, &envelope);
470 assert_eq!(decision.lane, ApplyLane::Noop);
471 assert!(decision.payload.is_none());
472 assert!(decision.conflicts.is_empty());
473 }
474
475 #[test]
476 fn plan_change_new_record_routes_to_rest() {
477 let object = test_object_sync();
478 let envelope = test_envelope(
479 SourceSystem::Postgres,
480 ChangeOperation::Upsert,
481 json!({"Name": "New"}),
482 );
483 let ctx = test_context(&object, None);
484
485 let decision = plan_change(&ctx, &envelope);
486 assert_eq!(decision.lane, ApplyLane::Rest);
487 assert_eq!(decision.payload, Some(json!({"Name": "New"})));
488 }
489
490 #[test]
491 fn plan_change_urgent_routes_to_rest() {
492 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
493 let envelope = test_envelope(
494 SourceSystem::Postgres,
495 ChangeOperation::Upsert,
496 json!({"Name": "Updated"}),
497 );
498 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
499 ctx.urgent = true;
500
501 let decision = plan_change(&ctx, &envelope);
502 assert_eq!(decision.lane, ApplyLane::Rest);
503 }
504
505 #[test]
506 fn plan_change_with_dependencies_routes_to_composite_graph() {
507 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
508 let envelope = test_envelope(
509 SourceSystem::Postgres,
510 ChangeOperation::Upsert,
511 json!({"Name": "Updated"}),
512 );
513 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
514 ctx.has_dependencies = true;
515
516 let decision = plan_change(&ctx, &envelope);
517 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
518 }
519
520 #[test]
521 fn plan_change_large_batch_routes_to_bulk() {
522 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
523 let envelope = test_envelope(
524 SourceSystem::Postgres,
525 ChangeOperation::Upsert,
526 json!({"Name": "Updated"}),
527 );
528 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
529 ctx.batch_size = 1000;
530
531 let decision = plan_change(&ctx, &envelope);
532 assert_eq!(decision.lane, ApplyLane::Bulk);
533 }
534
535 #[test]
536 fn plan_change_conflict_returns_conflict_lane() {
537 let object = test_object_sync_with_ownership();
538 let envelope = test_envelope(
539 SourceSystem::Salesforce,
540 ChangeOperation::Upsert,
541 json!({"Phone": "new-value"}),
542 );
543 let ctx = test_context(&object, Some(json!({"Phone": "old-value"})));
544
545 let decision = plan_change(&ctx, &envelope);
546 assert_eq!(decision.lane, ApplyLane::Conflict);
547 assert!(decision.payload.is_none());
548 assert!(!decision.conflicts.is_empty());
549 }
550
551 #[test]
552 fn plan_change_merged_equals_current_returns_noop() {
553 let object = test_object_sync_with_ownership();
554 let current = json!({"Name": "Acme", "Billing": "123 Main"});
555 let incoming = json!({"Name": "Changed"});
556 let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
557 let ctx = test_context(&object, Some(current));
558
559 let decision = plan_change(&ctx, &envelope);
560 assert_eq!(decision.lane, ApplyLane::Noop);
561 assert!(decision.payload.is_none());
562 }
563
564 #[test]
565 fn plan_change_batch_below_bulk_threshold_routes_to_rest() {
566 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
567 let envelope = test_envelope(
568 SourceSystem::Postgres,
569 ChangeOperation::Upsert,
570 json!({"Name": "Updated"}),
571 );
572 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
573 ctx.batch_size = 499;
574
575 let decision = plan_change(&ctx, &envelope);
576 assert_eq!(decision.lane, ApplyLane::Rest);
577 }
578
579 #[test]
580 fn plan_change_batch_at_bulk_threshold_routes_to_bulk() {
581 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
582 let envelope = test_envelope(
583 SourceSystem::Postgres,
584 ChangeOperation::Upsert,
585 json!({"Name": "Updated"}),
586 );
587 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
588 ctx.batch_size = 500;
589
590 let decision = plan_change(&ctx, &envelope);
591 assert_eq!(decision.lane, ApplyLane::Bulk);
592 }
593
594 #[test]
595 fn plan_change_dependencies_take_priority_over_bulk() {
596 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
597 let envelope = test_envelope(
598 SourceSystem::Postgres,
599 ChangeOperation::Upsert,
600 json!({"Name": "Updated"}),
601 );
602 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
603 ctx.batch_size = 1000;
604 ctx.has_dependencies = true;
605
606 let decision = plan_change(&ctx, &envelope);
607 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
608 }
609
610 #[test]
611 fn plan_change_dependencies_take_priority_over_urgent() {
612 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
613 let envelope = test_envelope(
614 SourceSystem::Postgres,
615 ChangeOperation::Upsert,
616 json!({"Name": "Updated"}),
617 );
618 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
619 ctx.urgent = true;
620 ctx.has_dependencies = true;
621
622 let decision = plan_change(&ctx, &envelope);
623 assert_eq!(decision.lane, ApplyLane::CompositeGraph);
624 }
625
626 #[test]
627 fn plan_change_urgent_takes_priority_over_bulk() {
628 let object = ObjectSync::new("Account").field_owner("Name", Owner::Postgres);
629 let envelope = test_envelope(
630 SourceSystem::Postgres,
631 ChangeOperation::Upsert,
632 json!({"Name": "Updated"}),
633 );
634 let mut ctx = test_context(&object, Some(json!({"Name": "Old"})));
635 ctx.batch_size = 1000;
636 ctx.urgent = true;
637
638 let decision = plan_change(&ctx, &envelope);
639 assert_eq!(decision.lane, ApplyLane::Rest);
640 }
641
642 #[test]
643 fn plan_change_semantically_equal_payload_returns_noop() {
644 let object = test_object_sync();
645 let current = json!({"A": 1, "B": 2});
646 let incoming = json!({"B": 2, "A": 1});
647 let envelope = test_envelope(SourceSystem::Postgres, ChangeOperation::Upsert, incoming);
648 let ctx = test_context(&object, Some(current));
649
650 let decision = plan_change(&ctx, &envelope);
651 assert_eq!(decision.lane, ApplyLane::Noop);
652 }
653}