Skip to main content

mfm_machine_test_support/
lib.rs

1#![warn(missing_docs)]
2//! Shared contract tests for `mfm-machine` storage traits.
3//!
4//! This crate stays intentionally small so storage backend crates can reuse the same
5//! conformance checks without creating dependency cycles back into the runtime.
6//!
7//! # Examples
8//!
9//! ```rust
10//! use mfm_machine::stores::ArtifactStore;
11//! use mfm_machine_test_support::{artifact_store_contract_tests, init_test_observability};
12//!
13//! async fn assert_artifact_store(store: &dyn ArtifactStore) {
14//!     init_test_observability();
15//!     artifact_store_contract_tests(store).await;
16//! }
17//! ```
18
19use std::sync::Once;
20
21use mfm_machine::errors::StorageError;
22use mfm_machine::events::{Event, EventEnvelope, KernelEvent, RunStatus};
23use mfm_machine::hashing::artifact_id_for_bytes;
24use mfm_machine::ids::{ArtifactId, OpId, RunId, StateId};
25use mfm_machine::stores::{ArtifactKind, ArtifactStore, EventStore};
26
27const TEST_FILTER_DEFAULT: &str = "warn,mfm=debug";
28const TEST_FILTER_VERBOSE: &str = "debug,mfm=trace";
29
30fn resolve_test_filter<F>(mut lookup: F) -> String
31where
32    F: FnMut(&str) -> Option<String>,
33{
34    if let Some(filter) = lookup("MFM_TEST_LOG_FILTER") {
35        return filter;
36    }
37    if let Some(filter) = lookup("MFM_LOG") {
38        return filter;
39    }
40    if let Some(filter) = lookup("LOG_LEVEL") {
41        return filter;
42    }
43    if let Some(filter) = lookup("RUST_LOG") {
44        return filter;
45    }
46
47    if lookup("MFM_TEST_LOG").is_some() {
48        TEST_FILTER_VERBOSE.to_string()
49    } else {
50        TEST_FILTER_DEFAULT.to_string()
51    }
52}
53
54/// Initializes a process-wide tracing subscriber for integration and contract tests.
55///
56/// The filter resolution order matches the repository test contract:
57/// `MFM_TEST_LOG_FILTER`, `MFM_LOG`, `LOG_LEVEL`, `RUST_LOG`, then `MFM_TEST_LOG`.
58/// Repeated calls are harmless and only the first invocation installs the subscriber.
59pub fn init_test_observability() {
60    static INIT: Once = Once::new();
61
62    INIT.call_once(|| {
63        let filter = resolve_test_filter(|name| std::env::var(name).ok());
64
65        let _ = tracing_subscriber::fmt()
66            .with_env_filter(
67                tracing_subscriber::EnvFilter::try_new(filter)
68                    .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(TEST_FILTER_DEFAULT)),
69            )
70            .with_test_writer()
71            .with_target(true)
72            .try_init();
73    });
74}
75
76/// Runs the shared `ArtifactStore` contract suite against a backend.
77///
78/// The suite currently verifies:
79/// - content-addressed writes
80/// - round-trip reads
81/// - missing-artifact behavior for `exists` and `get`
82///
83/// Backend crates typically call this from their own async integration tests after provisioning
84/// a clean store instance for the test case.
85pub async fn artifact_store_contract_tests(store: &dyn ArtifactStore) {
86    put_get_roundtrip(store).await;
87    content_addressed(store).await;
88    exists_and_not_found(store).await;
89}
90
91/// Runs the shared `EventStore` contract suite against a backend.
92///
93/// The suite currently verifies:
94/// - append/read round trips
95/// - optimistic concurrency via `expected_seq`
96///
97/// The supplied store should start from an isolated test database or namespace so the sequence and
98/// concurrency assertions do not interact with events written by other tests.
99pub async fn event_store_contract_tests(store: &dyn EventStore) {
100    append_and_read(store).await;
101    expected_seq_concurrency(store).await;
102}
103
104async fn put_get_roundtrip(store: &dyn ArtifactStore) {
105    let bytes = b"hello artifact".to_vec();
106    let id = store
107        .put(ArtifactKind::Other("test".to_string()), bytes.clone())
108        .await
109        .expect("put must succeed");
110
111    assert_eq!(id, artifact_id_for_bytes(&bytes));
112
113    let got = store.get(&id).await.expect("get must succeed");
114    assert_eq!(got, bytes);
115}
116
117async fn content_addressed(store: &dyn ArtifactStore) {
118    let bytes = b"same bytes".to_vec();
119
120    let id1 = store
121        .put(ArtifactKind::Other("k1".to_string()), bytes.clone())
122        .await
123        .expect("put must succeed");
124
125    let id2 = store
126        .put(ArtifactKind::Other("k2".to_string()), bytes.clone())
127        .await
128        .expect("put must succeed");
129
130    assert_eq!(id1, id2);
131    assert_eq!(id1, artifact_id_for_bytes(&bytes));
132}
133
134async fn exists_and_not_found(store: &dyn ArtifactStore) {
135    let missing = ArtifactId("0".repeat(64));
136    assert!(!store.exists(&missing).await.expect("exists must succeed"));
137
138    match store.get(&missing).await {
139        Err(StorageError::NotFound(_)) => {}
140        other => panic!("expected NotFound for missing artifact, got: {other:?}"),
141    }
142}
143
144async fn append_and_read(store: &dyn EventStore) {
145    let run_id = RunId(uuid::Uuid::new_v4());
146
147    assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 0);
148
149    let e1 = EventEnvelope {
150        run_id,
151        seq: 1,
152        ts_millis: Some(1),
153        event: Event::Kernel(KernelEvent::RunStarted {
154            op_id: OpId::must_new("op".to_string()),
155            manifest_id: ArtifactId("0".repeat(64)),
156            initial_snapshot_id: ArtifactId("1".repeat(64)),
157        }),
158    };
159
160    let head = store
161        .append(run_id, 0, vec![e1.clone()])
162        .await
163        .expect("append");
164    assert_eq!(head, 1);
165    assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 1);
166
167    let got = store.read_range(run_id, 1, None).await.expect("read_range");
168    assert_eq!(got, vec![e1.clone()]);
169
170    let e2 = EventEnvelope {
171        run_id,
172        seq: 2,
173        ts_millis: Some(2),
174        event: Event::Kernel(KernelEvent::StateEntered {
175            state_id: StateId::must_new("machine.main.setup".to_string()),
176            attempt: 0,
177            base_snapshot_id: ArtifactId("2".repeat(64)),
178        }),
179    };
180    let e3 = EventEnvelope {
181        run_id,
182        seq: 3,
183        ts_millis: Some(3),
184        event: Event::Kernel(KernelEvent::RunCompleted {
185            status: RunStatus::Completed,
186            final_snapshot_id: None,
187        }),
188    };
189
190    let head = store
191        .append(run_id, 1, vec![e2.clone(), e3.clone()])
192        .await
193        .expect("append");
194    assert_eq!(head, 3);
195
196    let got = store
197        .read_range(run_id, 2, Some(2))
198        .await
199        .expect("read_range");
200    assert_eq!(got, vec![e2]);
201}
202
203async fn expected_seq_concurrency(store: &dyn EventStore) {
204    let run_id = RunId(uuid::Uuid::new_v4());
205
206    let e1 = EventEnvelope {
207        run_id,
208        seq: 1,
209        ts_millis: None,
210        event: Event::Kernel(KernelEvent::RunStarted {
211            op_id: OpId::must_new("op".to_string()),
212            manifest_id: ArtifactId("0".repeat(64)),
213            initial_snapshot_id: ArtifactId("1".repeat(64)),
214        }),
215    };
216
217    let head = store
218        .append(run_id, 0, vec![e1.clone()])
219        .await
220        .expect("append");
221    assert_eq!(head, 1);
222
223    // ExpectedSeq concurrency: appending with an old expected seq must fail and must not change head.
224    let err = store
225        .append(run_id, 0, vec![e1])
226        .await
227        .expect_err("append should fail");
228    match err {
229        StorageError::Concurrency(_) => {}
230        other => panic!("expected Concurrency error, got: {other:?}"),
231    }
232
233    assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 1);
234}
235
236#[cfg(test)]
237mod tests {
238    use std::collections::HashMap;
239
240    use super::{resolve_test_filter, TEST_FILTER_DEFAULT, TEST_FILTER_VERBOSE};
241
242    fn lookup_from(entries: &[(&str, &str)]) -> impl FnMut(&str) -> Option<String> {
243        let vars: HashMap<String, String> = entries
244            .iter()
245            .map(|(name, value)| ((*name).to_string(), (*value).to_string()))
246            .collect();
247        move |name| vars.get(name).cloned()
248    }
249
250    #[test]
251    fn test_filter_prefers_test_specific_override() {
252        let filter = resolve_test_filter(lookup_from(&[
253            ("MFM_TEST_LOG_FILTER", "trace"),
254            ("MFM_LOG", "warn"),
255            ("LOG_LEVEL", "info"),
256        ]));
257        assert_eq!(filter, "trace");
258    }
259
260    #[test]
261    fn test_filter_prefers_component_override_before_global_level() {
262        let filter = resolve_test_filter(lookup_from(&[
263            ("MFM_LOG", "debug,mfm=trace"),
264            ("LOG_LEVEL", "info"),
265            ("RUST_LOG", "warn"),
266        ]));
267        assert_eq!(filter, "debug,mfm=trace");
268    }
269
270    #[test]
271    fn test_filter_uses_global_level_before_rust_log() {
272        let filter =
273            resolve_test_filter(lookup_from(&[("LOG_LEVEL", "debug"), ("RUST_LOG", "warn")]));
274        assert_eq!(filter, "debug");
275    }
276
277    #[test]
278    fn test_filter_uses_verbose_default_when_enabled() {
279        let filter = resolve_test_filter(lookup_from(&[("MFM_TEST_LOG", "1")]));
280        assert_eq!(filter, TEST_FILTER_VERBOSE);
281    }
282
283    #[test]
284    fn test_filter_uses_quiet_default_when_not_enabled() {
285        let filter = resolve_test_filter(lookup_from(&[]));
286        assert_eq!(filter, TEST_FILTER_DEFAULT);
287    }
288}