engine/policy.rs
1//! Configuration vocabulary for a run: how changes are grouped into a flush
2//! ([`BatchPolicy`]) and how item-level sink rejections are handled
3//! ([`FailurePolicies`] over [`FailurePolicy`]).
4
5use std::collections::HashMap;
6use std::time::Duration;
7
8// The policy *vocabulary* (`Stop`/`Skip`) is a config-domain type, so it lives
9// in `schema-core` and is re-exported here for engine callers.
10pub use schema_core::FailurePolicy;
11
12/// How the worker groups changes into one sink flush.
13///
14/// Batching trades a little latency for far fewer round-trips: up to
15/// `max_changes` changes (or whatever has arrived after `max_delay`, whichever
16/// comes first) are buffered and flushed together. `max_changes: 1` reproduces
17/// the original flush-per-change behavior.
18///
19/// Acks respect the batch boundary — see the [module docs](crate). The source
20/// ack for a change is confirmed only after the flush that made its documents
21/// durable, so at-least-once delivery holds regardless of batch size.
22#[derive(Debug, Clone, Copy)]
23pub struct BatchPolicy {
24 /// Flush once this many changes have accumulated. Clamped to at least 1.
25 pub max_changes: usize,
26 /// Flush a partial batch this long after its first change, so a trickle of
27 /// changes still lands promptly instead of waiting for a full batch.
28 pub max_delay: Duration,
29}
30
31impl Default for BatchPolicy {
32 fn default() -> Self {
33 Self {
34 max_changes: 256,
35 max_delay: Duration::from_millis(50),
36 }
37 }
38}
39
40/// How the engine resolves the [`FailurePolicy`] for a rejected document: a
41/// global `default` plus per-index overrides, keyed by **logical** index name.
42///
43/// The engine governs only *item-level rejections* (a sink accepted the batch
44/// but refused specific documents). Transport failures, a source decode error,
45/// or a flush returning `Err` always stop the run regardless of this.
46#[derive(Debug, Clone, Default)]
47pub struct FailurePolicies {
48 default: FailurePolicy,
49 overrides: HashMap<String, FailurePolicy>,
50}
51
52impl FailurePolicies {
53 /// A policy set with `default` applied to every index and no overrides.
54 pub fn new(default: FailurePolicy) -> Self {
55 Self {
56 default,
57 overrides: HashMap::new(),
58 }
59 }
60
61 /// Override the policy for one logical index, leaving others on the default.
62 pub fn with_override(mut self, index: impl Into<String>, policy: FailurePolicy) -> Self {
63 self.overrides.insert(index.into(), policy);
64 self
65 }
66
67 /// The effective policy for `index` (its override, else the default).
68 pub fn resolve(&self, index: &str) -> FailurePolicy {
69 self.overrides.get(index).copied().unwrap_or(self.default)
70 }
71}