mfm_machine_test_support/
lib.rs1#![warn(missing_docs)]
2use std::sync::Once;
20
21use mfm_machine::errors::StorageError;
22use mfm_machine::events::{Event, EventEnvelope, KernelEvent, RunStatus};
23use mfm_machine::hashing::artifact_id_for_bytes;
24use mfm_machine::ids::{ArtifactId, OpId, RunId, StateId};
25use mfm_machine::stores::{ArtifactKind, ArtifactStore, EventStore};
26
27const TEST_FILTER_DEFAULT: &str = "warn,mfm=debug";
28const TEST_FILTER_VERBOSE: &str = "debug,mfm=trace";
29
30fn resolve_test_filter<F>(mut lookup: F) -> String
31where
32 F: FnMut(&str) -> Option<String>,
33{
34 if let Some(filter) = lookup("MFM_TEST_LOG_FILTER") {
35 return filter;
36 }
37 if let Some(filter) = lookup("MFM_LOG") {
38 return filter;
39 }
40 if let Some(filter) = lookup("LOG_LEVEL") {
41 return filter;
42 }
43 if let Some(filter) = lookup("RUST_LOG") {
44 return filter;
45 }
46
47 if lookup("MFM_TEST_LOG").is_some() {
48 TEST_FILTER_VERBOSE.to_string()
49 } else {
50 TEST_FILTER_DEFAULT.to_string()
51 }
52}
53
54pub fn init_test_observability() {
60 static INIT: Once = Once::new();
61
62 INIT.call_once(|| {
63 let filter = resolve_test_filter(|name| std::env::var(name).ok());
64
65 let _ = tracing_subscriber::fmt()
66 .with_env_filter(
67 tracing_subscriber::EnvFilter::try_new(filter)
68 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(TEST_FILTER_DEFAULT)),
69 )
70 .with_test_writer()
71 .with_target(true)
72 .try_init();
73 });
74}
75
76pub async fn artifact_store_contract_tests(store: &dyn ArtifactStore) {
86 put_get_roundtrip(store).await;
87 content_addressed(store).await;
88 exists_and_not_found(store).await;
89}
90
91pub async fn event_store_contract_tests(store: &dyn EventStore) {
100 append_and_read(store).await;
101 expected_seq_concurrency(store).await;
102}
103
104async fn put_get_roundtrip(store: &dyn ArtifactStore) {
105 let bytes = b"hello artifact".to_vec();
106 let id = store
107 .put(ArtifactKind::Other("test".to_string()), bytes.clone())
108 .await
109 .expect("put must succeed");
110
111 assert_eq!(id, artifact_id_for_bytes(&bytes));
112
113 let got = store.get(&id).await.expect("get must succeed");
114 assert_eq!(got, bytes);
115}
116
117async fn content_addressed(store: &dyn ArtifactStore) {
118 let bytes = b"same bytes".to_vec();
119
120 let id1 = store
121 .put(ArtifactKind::Other("k1".to_string()), bytes.clone())
122 .await
123 .expect("put must succeed");
124
125 let id2 = store
126 .put(ArtifactKind::Other("k2".to_string()), bytes.clone())
127 .await
128 .expect("put must succeed");
129
130 assert_eq!(id1, id2);
131 assert_eq!(id1, artifact_id_for_bytes(&bytes));
132}
133
134async fn exists_and_not_found(store: &dyn ArtifactStore) {
135 let missing = ArtifactId("0".repeat(64));
136 assert!(!store.exists(&missing).await.expect("exists must succeed"));
137
138 match store.get(&missing).await {
139 Err(StorageError::NotFound(_)) => {}
140 other => panic!("expected NotFound for missing artifact, got: {other:?}"),
141 }
142}
143
144async fn append_and_read(store: &dyn EventStore) {
145 let run_id = RunId(uuid::Uuid::new_v4());
146
147 assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 0);
148
149 let e1 = EventEnvelope {
150 run_id,
151 seq: 1,
152 ts_millis: Some(1),
153 event: Event::Kernel(KernelEvent::RunStarted {
154 op_id: OpId::must_new("op".to_string()),
155 manifest_id: ArtifactId("0".repeat(64)),
156 initial_snapshot_id: ArtifactId("1".repeat(64)),
157 }),
158 };
159
160 let head = store
161 .append(run_id, 0, vec![e1.clone()])
162 .await
163 .expect("append");
164 assert_eq!(head, 1);
165 assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 1);
166
167 let got = store.read_range(run_id, 1, None).await.expect("read_range");
168 assert_eq!(got, vec![e1.clone()]);
169
170 let e2 = EventEnvelope {
171 run_id,
172 seq: 2,
173 ts_millis: Some(2),
174 event: Event::Kernel(KernelEvent::StateEntered {
175 state_id: StateId::must_new("machine.main.setup".to_string()),
176 attempt: 0,
177 base_snapshot_id: ArtifactId("2".repeat(64)),
178 }),
179 };
180 let e3 = EventEnvelope {
181 run_id,
182 seq: 3,
183 ts_millis: Some(3),
184 event: Event::Kernel(KernelEvent::RunCompleted {
185 status: RunStatus::Completed,
186 final_snapshot_id: None,
187 }),
188 };
189
190 let head = store
191 .append(run_id, 1, vec![e2.clone(), e3.clone()])
192 .await
193 .expect("append");
194 assert_eq!(head, 3);
195
196 let got = store
197 .read_range(run_id, 2, Some(2))
198 .await
199 .expect("read_range");
200 assert_eq!(got, vec![e2]);
201}
202
203async fn expected_seq_concurrency(store: &dyn EventStore) {
204 let run_id = RunId(uuid::Uuid::new_v4());
205
206 let e1 = EventEnvelope {
207 run_id,
208 seq: 1,
209 ts_millis: None,
210 event: Event::Kernel(KernelEvent::RunStarted {
211 op_id: OpId::must_new("op".to_string()),
212 manifest_id: ArtifactId("0".repeat(64)),
213 initial_snapshot_id: ArtifactId("1".repeat(64)),
214 }),
215 };
216
217 let head = store
218 .append(run_id, 0, vec![e1.clone()])
219 .await
220 .expect("append");
221 assert_eq!(head, 1);
222
223 let err = store
225 .append(run_id, 0, vec![e1])
226 .await
227 .expect_err("append should fail");
228 match err {
229 StorageError::Concurrency(_) => {}
230 other => panic!("expected Concurrency error, got: {other:?}"),
231 }
232
233 assert_eq!(store.head_seq(run_id).await.expect("head_seq"), 1);
234}
235
236#[cfg(test)]
237mod tests {
238 use std::collections::HashMap;
239
240 use super::{resolve_test_filter, TEST_FILTER_DEFAULT, TEST_FILTER_VERBOSE};
241
242 fn lookup_from(entries: &[(&str, &str)]) -> impl FnMut(&str) -> Option<String> {
243 let vars: HashMap<String, String> = entries
244 .iter()
245 .map(|(name, value)| ((*name).to_string(), (*value).to_string()))
246 .collect();
247 move |name| vars.get(name).cloned()
248 }
249
250 #[test]
251 fn test_filter_prefers_test_specific_override() {
252 let filter = resolve_test_filter(lookup_from(&[
253 ("MFM_TEST_LOG_FILTER", "trace"),
254 ("MFM_LOG", "warn"),
255 ("LOG_LEVEL", "info"),
256 ]));
257 assert_eq!(filter, "trace");
258 }
259
260 #[test]
261 fn test_filter_prefers_component_override_before_global_level() {
262 let filter = resolve_test_filter(lookup_from(&[
263 ("MFM_LOG", "debug,mfm=trace"),
264 ("LOG_LEVEL", "info"),
265 ("RUST_LOG", "warn"),
266 ]));
267 assert_eq!(filter, "debug,mfm=trace");
268 }
269
270 #[test]
271 fn test_filter_uses_global_level_before_rust_log() {
272 let filter =
273 resolve_test_filter(lookup_from(&[("LOG_LEVEL", "debug"), ("RUST_LOG", "warn")]));
274 assert_eq!(filter, "debug");
275 }
276
277 #[test]
278 fn test_filter_uses_verbose_default_when_enabled() {
279 let filter = resolve_test_filter(lookup_from(&[("MFM_TEST_LOG", "1")]));
280 assert_eq!(filter, TEST_FILTER_VERBOSE);
281 }
282
283 #[test]
284 fn test_filter_uses_quiet_default_when_not_enabled() {
285 let filter = resolve_test_filter(lookup_from(&[]));
286 assert_eq!(filter, TEST_FILTER_DEFAULT);
287 }
288}