Skip to main content

engine/
policy.rs

1//! Configuration vocabulary for a run: how changes are grouped into a flush
2//! ([`BatchPolicy`]) and how item-level sink rejections are handled
3//! ([`FailurePolicies`] over [`FailurePolicy`]).
4
5use std::collections::HashMap;
6use std::time::Duration;
7
8pub use schema_core::FailurePolicy;
9
10/// How the worker groups changes into one sink flush.
11///
12/// Batching trades a little latency for far fewer round-trips: up to
13/// `max_changes` changes (or whatever has arrived after `max_delay`, whichever
14/// comes first) are buffered and flushed together. `max_changes: 1` reproduces
15/// the original flush-per-change behavior.
16///
17/// Acks respect the batch boundary — see the [module docs](crate). The source
18/// ack for a change is confirmed only after the flush that made its documents
19/// durable, so at-least-once delivery holds regardless of batch size.
20#[derive(Debug, Clone, Copy)]
21pub struct BatchPolicy {
22    /// Flush once this many changes have accumulated. Clamped to at least 1.
23    pub max_changes: usize,
24    /// Flush a partial batch this long after its first change, so a trickle of
25    /// changes still lands promptly instead of waiting for a full batch.
26    pub max_delay: Duration,
27}
28
29impl Default for BatchPolicy {
30    fn default() -> Self {
31        Self {
32            max_changes: 256,
33            max_delay: Duration::from_millis(50),
34        }
35    }
36}
37
38/// How the engine resolves the [`FailurePolicy`] for a rejected document: a
39/// global `default` plus per-index overrides, keyed by **logical** index name.
40///
41/// The engine governs only *item-level rejections* (a sink accepted the batch
42/// but refused specific documents). Transport failures, a source decode error,
43/// or a flush returning `Err` always stop the run regardless of this.
44#[derive(Debug, Clone, Default)]
45pub struct FailurePolicies {
46    default: FailurePolicy,
47    overrides: HashMap<String, FailurePolicy>,
48}
49
50impl FailurePolicies {
51    /// A policy set with `default` applied to every index and no overrides.
52    pub fn new(default: FailurePolicy) -> Self {
53        Self {
54            default,
55            overrides: HashMap::new(),
56        }
57    }
58
59    /// Override the policy for one logical index, leaving others on the default.
60    pub fn with_override(mut self, index: impl Into<String>, policy: FailurePolicy) -> Self {
61        self.overrides.insert(index.into(), policy);
62        self
63    }
64
65    /// The effective policy for `index` (its override, else the default).
66    pub fn resolve(&self, index: &str) -> FailurePolicy {
67        self.overrides.get(index).copied().unwrap_or(self.default)
68    }
69}