Skip to main content

observer_core/
runtime.rs

1// SPDX-FileCopyrightText: 2026 Alexander R. Croft
2// SPDX-License-Identifier: GPL-3.0-or-later
3
4use crate::error::{ObserverError, ObserverResult};
5use crate::inventory::{Inventory, InventoryEntry, InventoryRunner};
6use crate::normalize::{normalized_inventory_sha256, normalized_suite_sha256};
7use crate::providers::{decode_b64_bytes, run_provider_target, ResolvedProviders};
8use crate::report::{
9    derive_case_id, preview_bytes, ActionFail, ActionRecord, ActionStatus, ArtifactRecord,
10    AssertRecord, CaseRecord, ExtractRecord, ReportHeader, ReportMode, ReportRecord, Status, SummaryRecord,
11    TelemetryEntry, TelemetryRecord,
12};
13use crate::suite::{
14    CaseKeyField, CaseSource, CompareOp, Predicate, ResultExpr, SelectionMode, Selector, Statement,
15    SuiteCore, ValueExpr,
16};
17use regex::Regex;
18use serde_json::json;
19use serde_json::Number;
20use std::collections::BTreeMap;
21use std::fs;
22use std::io::{Read, Write};
23use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::thread;
27use std::time::{Duration, Instant};
28
29#[derive(Debug, Clone)]
30pub struct ExecutionOutcome {
31    pub records: Vec<ReportRecord>,
32    pub exit_code: i32,
33}
34
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct ExecutionOptions {
37    pub emit_telemetry: bool,
38    pub filter_test_name: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq)]
42struct ProcRun {
43    exit: i32,
44    out: Vec<u8>,
45    err: Vec<u8>,
46    telemetry: Vec<TelemetryEntry>,
47}
48
49#[derive(Debug, Clone, PartialEq)]
50struct HttpResp {
51    status: i64,
52    body: Vec<u8>,
53    headers: BTreeMap<String, String>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57struct TcpResp {
58    bytes: Vec<u8>,
59}
60
61#[derive(Debug, Clone, PartialEq)]
62struct RuntimeFail {
63    kind: String,
64    msg: String,
65    code: Option<String>,
66}
67
68#[derive(Debug, Clone, PartialEq)]
69enum RuntimeValue {
70    Int(i64),
71    String(String),
72    Bytes(Vec<u8>),
73    Object(BTreeMap<String, RuntimeValue>),
74    ProcRun(ProcRun),
75    HttpResp(HttpResp),
76    TcpResp(TcpResp),
77    Fail(RuntimeFail),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81struct ArtifactBinding {
82    name: String,
83    kind: String,
84    path: String,
85}
86
87#[derive(Debug, Clone, PartialEq)]
88struct CaseInstance {
89    case_key: String,
90    case_value: RuntimeValue,
91    test_name: String,
92}
93
94#[derive(Debug, Default)]
95struct CaseState {
96    bindings: BTreeMap<String, RuntimeValue>,
97    artifacts: BTreeMap<String, ArtifactBinding>,
98    action_ix: usize,
99    artifact_ix: usize,
100    extract_ix: usize,
101    assert_ix: usize,
102    assert_pass: usize,
103    assert_fail: usize,
104    unhandled_action_fail: usize,
105    records: Vec<ReportRecord>,
106}
107
108pub fn execute_suite(
109    inventory: &Inventory,
110    suite: &SuiteCore,
111    mode: ReportMode,
112) -> ObserverResult<ExecutionOutcome> {
113    execute_suite_with_options(inventory, suite, mode, ExecutionOptions::default())
114}
115
116pub fn execute_suite_with_options(
117    inventory: &Inventory,
118    suite: &SuiteCore,
119    mode: ReportMode,
120    options: ExecutionOptions,
121) -> ObserverResult<ExecutionOutcome> {
122    execute_suite_with_providers_and_options(inventory, suite, mode, None, options)
123}
124
125pub fn execute_suite_with_providers(
126    inventory: &Inventory,
127    suite: &SuiteCore,
128    mode: ReportMode,
129    providers: Option<&ResolvedProviders>,
130) -> ObserverResult<ExecutionOutcome> {
131    execute_suite_with_providers_and_options(
132        inventory,
133        suite,
134        mode,
135        providers,
136        ExecutionOptions::default(),
137    )
138}
139
140pub fn execute_suite_with_providers_and_options(
141    inventory: &Inventory,
142    suite: &SuiteCore,
143    mode: ReportMode,
144    providers: Option<&ResolvedProviders>,
145    options: ExecutionOptions,
146) -> ObserverResult<ExecutionOutcome> {
147    let inventory_sha256 = normalized_inventory_sha256(inventory)?;
148    let suite_sha256 = normalized_suite_sha256(suite)?;
149
150    let mut records = vec![ReportRecord::Header(ReportHeader::new(
151        inventory_sha256,
152        suite_sha256,
153        mode,
154    ))];
155
156    let mut case_pass = 0usize;
157    let mut case_fail = 0usize;
158    let mut assert_pass = 0usize;
159    let mut assert_fail = 0usize;
160
161    for item in &suite.items {
162        let mut selected = discover_cases(inventory, &item.case_source)?;
163        if let Some(filter) = options.filter_test_name.as_deref() {
164            selected.retain(|case| case.case_key == filter || case.test_name == filter);
165        }
166        if selected.is_empty() && matches!(item.selection_mode, SelectionMode::Required) {
167            if options.filter_test_name.is_some() {
168                continue;
169            }
170            return Err(ObserverError::Runtime(format!(
171                "required case source for item `{}` produced no cases",
172                item.item_id
173            )));
174        }
175
176        for case in selected {
177            let case_id = derive_case_id(&item.item_id, &case.case_key);
178            let mut state = CaseState::default();
179            state
180                .bindings
181                .insert(item.case_binding.clone(), case.case_value.clone());
182
183            for statement in &item.body {
184                evaluate_statement(statement, inventory, providers, &case_id, &mut state, &options)?;
185            }
186
187            let status = if state.assert_fail == 0 && state.unhandled_action_fail == 0 {
188                case_pass += 1;
189                Status::Pass
190            } else {
191                case_fail += 1;
192                Status::Fail
193            };
194
195            assert_pass += state.assert_pass;
196            assert_fail += state.assert_fail;
197            records.append(&mut state.records);
198            records.push(ReportRecord::Case(CaseRecord {
199                case_id,
200                item_id: item.item_id.clone(),
201                test_name: case.test_name,
202                case_key: match item.case_source {
203                    CaseSource::Inventory { .. } => None,
204                    CaseSource::Files { .. } => Some(case.case_key),
205                },
206                status,
207                assert_pass: state.assert_pass,
208                assert_fail: state.assert_fail,
209                unhandled_action_fail: state.unhandled_action_fail,
210            }));
211        }
212    }
213
214    let exit_code = if case_fail == 0 { 0 } else { 1 };
215    records.push(ReportRecord::Summary(SummaryRecord {
216        case_pass,
217        case_fail,
218        assert_pass,
219        assert_fail,
220        exit_code,
221    }));
222
223    Ok(ExecutionOutcome { records, exit_code })
224}
225
226fn discover_cases(inventory: &Inventory, case_source: &CaseSource) -> ObserverResult<Vec<CaseInstance>> {
227    match case_source {
228        CaseSource::Inventory { selector } => {
229            let mut names = Vec::new();
230            for entry in &inventory.entries {
231                if matches_selector(&entry.name, selector)? {
232                    names.push(entry.name.clone());
233                }
234            }
235            names.sort_by(|left, right| left.as_bytes().cmp(right.as_bytes()));
236            Ok(names
237                .into_iter()
238                .map(|name| CaseInstance {
239                    case_key: name.clone(),
240                    case_value: RuntimeValue::String(name.clone()),
241                    test_name: name,
242                })
243                .collect())
244        }
245        CaseSource::Files {
246            root,
247            glob,
248            key_field,
249        } => discover_file_cases(root, glob, *key_field),
250    }
251}
252
253fn matches_selector(name: &str, selector: &Selector) -> ObserverResult<bool> {
254    match selector {
255        Selector::Exact { value } => Ok(name == value),
256        Selector::Prefix { value } => Ok(name.starts_with(value)),
257        Selector::Glob { value } => Ok(matches_glob(name, value)),
258        Selector::Regex { value } => Regex::new(value)
259            .map(|regex| regex.is_match(name))
260            .map_err(|error| ObserverError::Runtime(format!("invalid selector regex: {error}"))),
261        Selector::Any { items } => {
262            for item in items {
263                if matches_selector(name, item)? {
264                    return Ok(true);
265                }
266            }
267            Ok(false)
268        }
269        Selector::All { items } => {
270            for item in items {
271                if !matches_selector(name, item)? {
272                    return Ok(false);
273                }
274            }
275            Ok(true)
276        }
277    }
278}
279
280fn matches_glob(name: &str, pattern: &str) -> bool {
281    matches_glob_bytes(name.as_bytes(), pattern.as_bytes())
282}
283
284fn discover_file_cases(root: &str, glob: &str, key_field: CaseKeyField) -> ObserverResult<Vec<CaseInstance>> {
285    let root_path = PathBuf::from(root);
286    let mut rel_paths = Vec::new();
287    collect_relative_files(&root_path, &root_path, &mut rel_paths)?;
288    rel_paths.sort_by(|left, right| left.as_bytes().cmp(right.as_bytes()));
289
290    let mut cases = Vec::new();
291    for rel_path in rel_paths {
292        if !matches_glob(&rel_path, glob) {
293            continue;
294        }
295        let path = Path::new(&rel_path);
296        let name = path
297            .file_name()
298            .and_then(|value| value.to_str())
299            .ok_or_else(|| ObserverError::Runtime(format!("invalid discovered file name `{rel_path}`")))?
300            .to_owned();
301        let stem = path
302            .file_stem()
303            .and_then(|value| value.to_str())
304            .ok_or_else(|| ObserverError::Runtime(format!("invalid discovered file stem `{rel_path}`")))?
305            .to_owned();
306        let ext = path
307            .extension()
308            .and_then(|value| value.to_str())
309            .unwrap_or("")
310            .to_owned();
311        let case_key = match key_field {
312            CaseKeyField::Path => rel_path.clone(),
313            CaseKeyField::Name => name.clone(),
314            CaseKeyField::Stem => stem.clone(),
315        };
316        let mut fields = BTreeMap::new();
317        fields.insert("key".to_owned(), RuntimeValue::String(case_key.clone()));
318        fields.insert("path".to_owned(), RuntimeValue::String(rel_path.clone()));
319        fields.insert("name".to_owned(), RuntimeValue::String(name));
320        fields.insert("stem".to_owned(), RuntimeValue::String(stem));
321        fields.insert("ext".to_owned(), RuntimeValue::String(ext));
322        cases.push(CaseInstance {
323            case_key: case_key.clone(),
324            test_name: case_key,
325            case_value: RuntimeValue::Object(fields),
326        });
327    }
328    Ok(cases)
329}
330
331fn collect_relative_files(root: &Path, current: &Path, out: &mut Vec<String>) -> ObserverResult<()> {
332    let entries = fs::read_dir(current)
333        .map_err(|error| ObserverError::Runtime(format!("failed to read `{}`: {error}", current.display())))?;
334    let mut children = Vec::new();
335    for entry in entries {
336        let entry = entry
337            .map_err(|error| ObserverError::Runtime(format!("failed to enumerate `{}`: {error}", current.display())))?;
338        children.push(entry.path());
339    }
340    children.sort_by(|left, right| left.as_os_str().as_encoded_bytes().cmp(right.as_os_str().as_encoded_bytes()));
341
342    for path in children {
343        let metadata = fs::metadata(&path)
344            .map_err(|error| ObserverError::Runtime(format!("failed to stat `{}`: {error}", path.display())))?;
345        if metadata.is_dir() {
346            collect_relative_files(root, &path, out)?;
347        } else if metadata.is_file() {
348            let rel = path
349                .strip_prefix(root)
350                .map_err(|error| ObserverError::Runtime(format!("failed to relativize `{}`: {error}", path.display())))?;
351            out.push(normalize_relative_path(rel));
352        }
353    }
354    Ok(())
355}
356
357fn normalize_relative_path(path: &Path) -> String {
358    path.components()
359        .map(|component| component.as_os_str().to_string_lossy().into_owned())
360        .collect::<Vec<_>>()
361        .join("/")
362}
363
364fn matches_glob_bytes(text: &[u8], pattern: &[u8]) -> bool {
365    if pattern.is_empty() {
366        return text.is_empty();
367    }
368    if pattern.len() >= 3 && pattern[0] == b'*' && pattern[1] == b'*' && pattern[2] == b'/' {
369        if matches_glob_bytes(text, &pattern[3..]) {
370            return true;
371        }
372    }
373    match pattern[0] {
374        b'*' => {
375            for offset in 0..=text.len() {
376                if matches_glob_bytes(&text[offset..], &pattern[1..]) {
377                    return true;
378                }
379            }
380            false
381        }
382        b'?' => !text.is_empty() && matches_glob_bytes(&text[1..], &pattern[1..]),
383        byte => !text.is_empty() && text[0] == byte && matches_glob_bytes(&text[1..], &pattern[1..]),
384    }
385}
386
387fn evaluate_statement(
388    statement: &Statement,
389    inventory: &Inventory,
390    providers: Option<&ResolvedProviders>,
391    case_id: &str,
392    state: &mut CaseState,
393    options: &ExecutionOptions,
394) -> ObserverResult<()> {
395    match statement {
396        Statement::Assert { predicate } => {
397            let (status, msg) = evaluate_assert(predicate, state)?;
398            let record = ReportRecord::Assert(AssertRecord {
399                case_id: case_id.to_owned(),
400                assert_ix: state.assert_ix,
401                status,
402                msg,
403            });
404            state.assert_ix += 1;
405            match status {
406                Status::Pass => state.assert_pass += 1,
407                Status::Fail => state.assert_fail += 1,
408            }
409            state.records.push(record);
410            Ok(())
411        }
412        Statement::Publish {
413            name,
414            artifact_kind,
415            path,
416        } => {
417            let action_ix = state.action_ix;
418            state.action_ix += 1;
419            let path = as_string(eval_value(path, &state.bindings, &state.artifacts)?)?;
420            match validate_artifact_path(&path, artifact_kind) {
421                Ok(()) => {
422                    let binding = ArtifactBinding {
423                        name: name.clone(),
424                        kind: artifact_kind.clone(),
425                        path: path.clone(),
426                    };
427                    state.artifacts.insert(name.clone(), binding);
428                    state.records.push(ReportRecord::Action(ActionRecord {
429                        case_id: case_id.to_owned(),
430                        action_ix,
431                        action: "artifact_publish".to_owned(),
432                        status: ActionStatus::Ok,
433                        args: json!({"name": name, "kind": artifact_kind, "path": path}),
434                        ok: Some(json!({"name": name, "kind": artifact_kind, "path": path})),
435                        fail: None,
436                    }));
437                    state.records.push(ReportRecord::Artifact(ArtifactRecord {
438                        case_id: case_id.to_owned(),
439                        artifact_ix: state.artifact_ix,
440                        action_ix,
441                        name: name.clone(),
442                        event: "publish".to_owned(),
443                        kind: artifact_kind.clone(),
444                        status: ActionStatus::Ok,
445                        location: Some(json!({"path": path})),
446                        fail: None,
447                    }));
448                    state.artifact_ix += 1;
449                    Ok(())
450                }
451                Err(fail) => {
452                    state.records.push(ReportRecord::Action(ActionRecord {
453                        case_id: case_id.to_owned(),
454                        action_ix,
455                        action: "artifact_publish".to_owned(),
456                        status: ActionStatus::Fail,
457                        args: json!({"name": name, "kind": artifact_kind, "path": path}),
458                        ok: None,
459                        fail: Some(ActionFail {
460                            kind: fail.kind.clone(),
461                            msg: fail.msg.clone(),
462                            code: fail.code.clone(),
463                        }),
464                    }));
465                    state.records.push(ReportRecord::Artifact(ArtifactRecord {
466                        case_id: case_id.to_owned(),
467                        artifact_ix: state.artifact_ix,
468                        action_ix,
469                        name: name.clone(),
470                        event: "publish".to_owned(),
471                        kind: artifact_kind.clone(),
472                        status: ActionStatus::Fail,
473                        location: Some(json!({"path": path})),
474                        fail: Some(ActionFail {
475                            kind: fail.kind.clone(),
476                            msg: fail.msg.clone(),
477                            code: fail.code.clone(),
478                        }),
479                    }));
480                    state.artifact_ix += 1;
481                    Err(ObserverError::Runtime(format!("{}: {}", fail.kind, fail.msg)))
482                }
483            }
484        }
485        Statement::ResultBranch {
486            result,
487            ok_binding,
488            ok,
489            fail_binding,
490            fail,
491        } => {
492            match execute_result(result, inventory, providers, case_id, state, options) {
493                Ok(value) => {
494                    let previous = state.bindings.insert(ok_binding.clone(), value);
495                    for nested in ok {
496                        evaluate_statement(nested, inventory, providers, case_id, state, options)?;
497                    }
498                    restore_binding(&mut state.bindings, ok_binding, previous);
499                }
500                Err(runtime_fail) => {
501                    if fail.is_empty() {
502                        state.unhandled_action_fail += 1;
503                    } else if let Some(binding) = fail_binding {
504                        let previous = state
505                            .bindings
506                            .insert(binding.clone(), RuntimeValue::Fail(runtime_fail));
507                        for nested in fail {
508                            evaluate_statement(nested, inventory, providers, case_id, state, options)?;
509                        }
510                        restore_binding(&mut state.bindings, binding, previous);
511                    }
512                }
513            }
514            Ok(())
515        }
516        Statement::BoolBranch {
517            predicate,
518            if_true,
519            if_false,
520        } => {
521            if evaluate_predicate(predicate, state)? {
522                for nested in if_true {
523                    evaluate_statement(nested, inventory, providers, case_id, state, options)?;
524                }
525            } else {
526                for nested in if_false {
527                    evaluate_statement(nested, inventory, providers, case_id, state, options)?;
528                }
529            }
530            Ok(())
531        }
532    }
533}
534
535fn restore_binding(
536    bindings: &mut BTreeMap<String, RuntimeValue>,
537    key: &str,
538    previous: Option<RuntimeValue>,
539) {
540    if let Some(value) = previous {
541        bindings.insert(key.to_owned(), value);
542    } else {
543        bindings.remove(key);
544    }
545}
546
547fn execute_result(
548    result: &ResultExpr,
549    inventory: &Inventory,
550    providers: Option<&ResolvedProviders>,
551    case_id: &str,
552    state: &mut CaseState,
553    options: &ExecutionOptions,
554) -> Result<RuntimeValue, RuntimeFail> {
555    match result {
556        ResultExpr::Run { test, timeout_ms } => {
557            let test_name = as_string(eval_value(test, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
558                .map_err(to_runtime_fail)?;
559            let entry = inventory
560                .entries
561                .iter()
562                .find(|entry| entry.name == test_name)
563                .ok_or_else(|| RuntimeFail {
564                    kind: "unknown-test".to_owned(),
565                    msg: "unknown test".to_owned(),
566                    code: None,
567                })?;
568            let action_ix = state.action_ix;
569            state.action_ix += 1;
570            match execute_inventory_entry(entry, providers, *timeout_ms) {
571                Ok(run) => {
572                    let telemetry = run.telemetry.clone();
573                    state.records.push(ReportRecord::Action(ActionRecord {
574                        case_id: case_id.to_owned(),
575                        action_ix,
576                        action: "run".to_owned(),
577                        status: ActionStatus::Ok,
578                        args: json!({"test_name": test_name, "timeout_ms": timeout_ms}),
579                        ok: Some(proc_ok_payload(&run)),
580                        fail: None,
581                    }));
582                    if options.emit_telemetry {
583                        append_action_telemetry_records(&mut state.records, case_id, action_ix, &telemetry);
584                    }
585                    Ok(RuntimeValue::ProcRun(run))
586                }
587                Err(fail) => {
588                    state.records.push(ReportRecord::Action(ActionRecord {
589                        case_id: case_id.to_owned(),
590                        action_ix,
591                        action: "run".to_owned(),
592                        status: ActionStatus::Fail,
593                        args: json!({"test_name": test_name, "timeout_ms": timeout_ms}),
594                        ok: None,
595                        fail: Some(ActionFail {
596                            kind: fail.kind.clone(),
597                            msg: fail.msg.clone(),
598                            code: fail.code.clone(),
599                        }),
600                    }));
601                    Err(fail)
602                }
603            }
604        }
605        ResultExpr::Proc {
606            path,
607            args,
608            timeout_ms,
609        } => {
610            let path = as_string(eval_value(path, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
611                .map_err(to_runtime_fail)?;
612            let resolved_args = evaluate_proc_args(args, &state.bindings, &state.artifacts)?;
613            let action_ix = state.action_ix;
614            state.action_ix += 1;
615            match execute_process(&path, &resolved_args, *timeout_ms) {
616                Ok(run) => {
617                    let telemetry = run.telemetry.clone();
618                    state.records.push(ReportRecord::Action(ActionRecord {
619                        case_id: case_id.to_owned(),
620                        action_ix,
621                        action: "proc".to_owned(),
622                        status: ActionStatus::Ok,
623                        args: json!({"path": path, "args": resolved_args, "timeout_ms": timeout_ms}),
624                        ok: Some(proc_ok_payload(&run)),
625                        fail: None,
626                    }));
627                    if options.emit_telemetry {
628                        append_action_telemetry_records(&mut state.records, case_id, action_ix, &telemetry);
629                    }
630                    Ok(RuntimeValue::ProcRun(run))
631                }
632                Err(fail) => {
633                    state.records.push(ReportRecord::Action(ActionRecord {
634                        case_id: case_id.to_owned(),
635                        action_ix,
636                        action: "proc".to_owned(),
637                        status: ActionStatus::Fail,
638                        args: json!({"path": path, "args": resolved_args, "timeout_ms": timeout_ms}),
639                        ok: None,
640                        fail: Some(ActionFail {
641                            kind: fail.kind.clone(),
642                            msg: fail.msg.clone(),
643                            code: fail.code.clone(),
644                        }),
645                    }));
646                    Err(fail)
647                }
648            }
649        }
650        ResultExpr::HttpGet { url, timeout_ms } => {
651            let url = as_string(eval_value(url, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
652                .map_err(to_runtime_fail)?;
653            let action_ix = state.action_ix;
654            state.action_ix += 1;
655            match execute_http_get(&url, *timeout_ms) {
656                Ok(response) => {
657                    state.records.push(ReportRecord::Action(ActionRecord {
658                        case_id: case_id.to_owned(),
659                        action_ix,
660                        action: "http_get".to_owned(),
661                        status: ActionStatus::Ok,
662                        args: json!({"url": url, "timeout_ms": timeout_ms}),
663                        ok: Some(http_ok_payload(&response)),
664                        fail: None,
665                    }));
666                    Ok(RuntimeValue::HttpResp(response))
667                }
668                Err(fail) => {
669                    state.records.push(ReportRecord::Action(ActionRecord {
670                        case_id: case_id.to_owned(),
671                        action_ix,
672                        action: "http_get".to_owned(),
673                        status: ActionStatus::Fail,
674                        args: json!({"url": url, "timeout_ms": timeout_ms}),
675                        ok: None,
676                        fail: Some(ActionFail {
677                            kind: fail.kind.clone(),
678                            msg: fail.msg.clone(),
679                            code: fail.code.clone(),
680                        }),
681                    }));
682                    Err(fail)
683                }
684            }
685        }
686        ResultExpr::Tcp {
687            address,
688            send,
689            recv_max,
690            timeout_ms,
691        } => {
692            let address = as_string(eval_value(address, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
693                .map_err(to_runtime_fail)?;
694            let send = as_bytes(eval_value(send, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
695                .map_err(to_runtime_fail)?;
696            let action_ix = state.action_ix;
697            state.action_ix += 1;
698            match execute_tcp(&address, &send, *recv_max as usize, *timeout_ms) {
699                Ok(response) => {
700                    state.records.push(ReportRecord::Action(ActionRecord {
701                        case_id: case_id.to_owned(),
702                        action_ix,
703                        action: "tcp".to_owned(),
704                        status: ActionStatus::Ok,
705                        args: json!({
706                            "address": address,
707                            "send_len": send.len(),
708                            "recv_max": recv_max,
709                            "timeout_ms": timeout_ms
710                        }),
711                        ok: Some(tcp_ok_payload(&response)),
712                        fail: None,
713                    }));
714                    Ok(RuntimeValue::TcpResp(response))
715                }
716                Err(fail) => {
717                    state.records.push(ReportRecord::Action(ActionRecord {
718                        case_id: case_id.to_owned(),
719                        action_ix,
720                        action: "tcp".to_owned(),
721                        status: ActionStatus::Fail,
722                        args: json!({
723                            "address": address,
724                            "send_len": send.len(),
725                            "recv_max": recv_max,
726                            "timeout_ms": timeout_ms
727                        }),
728                        ok: None,
729                        fail: Some(ActionFail {
730                            kind: fail.kind.clone(),
731                            msg: fail.msg.clone(),
732                            code: fail.code.clone(),
733                        }),
734                    }));
735                    Err(fail)
736                }
737            }
738        }
739        ResultExpr::ArtifactCheck { name, artifact_kind } => {
740            let action_ix = state.action_ix;
741            state.action_ix += 1;
742            match check_artifact(name, artifact_kind, &state.artifacts) {
743                Ok(binding) => {
744                    state.records.push(ReportRecord::Action(ActionRecord {
745                        case_id: case_id.to_owned(),
746                        action_ix,
747                        action: "artifact_check".to_owned(),
748                        status: ActionStatus::Ok,
749                        args: json!({"name": name, "kind": artifact_kind}),
750                        ok: Some(json!({"name": binding.name, "kind": binding.kind, "path": binding.path})),
751                        fail: None,
752                    }));
753                    state.records.push(ReportRecord::Artifact(ArtifactRecord {
754                        case_id: case_id.to_owned(),
755                        artifact_ix: state.artifact_ix,
756                        action_ix,
757                        name: name.clone(),
758                        event: "check".to_owned(),
759                        kind: artifact_kind.clone(),
760                        status: ActionStatus::Ok,
761                        location: Some(json!({"path": binding.path})),
762                        fail: None,
763                    }));
764                    state.artifact_ix += 1;
765                    Ok(artifact_binding_value(&binding))
766                }
767                Err(fail) => {
768                    state.records.push(ReportRecord::Action(ActionRecord {
769                        case_id: case_id.to_owned(),
770                        action_ix,
771                        action: "artifact_check".to_owned(),
772                        status: ActionStatus::Fail,
773                        args: json!({"name": name, "kind": artifact_kind}),
774                        ok: None,
775                        fail: Some(ActionFail {
776                            kind: fail.kind.clone(),
777                            msg: fail.msg.clone(),
778                            code: fail.code.clone(),
779                        }),
780                    }));
781                    state.records.push(ReportRecord::Artifact(ArtifactRecord {
782                        case_id: case_id.to_owned(),
783                        artifact_ix: state.artifact_ix,
784                        action_ix,
785                        name: name.clone(),
786                        event: "check".to_owned(),
787                        kind: artifact_kind.clone(),
788                        status: ActionStatus::Fail,
789                        location: None,
790                        fail: Some(ActionFail {
791                            kind: fail.kind.clone(),
792                            msg: fail.msg.clone(),
793                            code: fail.code.clone(),
794                        }),
795                    }));
796                    state.artifact_ix += 1;
797                    Err(fail)
798                }
799            }
800        }
801        ResultExpr::ExtractJson { name, select } => {
802            execute_extract(name, select, false, case_id, state)
803        }
804        ResultExpr::ExtractJsonl { name, select } => {
805            execute_extract(name, select, true, case_id, state)
806        }
807    }
808}
809
810fn evaluate_proc_args(
811    args: &[ValueExpr],
812    bindings: &BTreeMap<String, RuntimeValue>,
813    artifacts: &BTreeMap<String, ArtifactBinding>,
814) -> Result<Vec<String>, RuntimeFail> {
815    let mut resolved = Vec::with_capacity(args.len());
816    for arg in args {
817        let value = eval_value(arg, bindings, artifacts).map_err(to_runtime_fail)?;
818        resolved.push(as_string(value).map_err(to_runtime_fail)?);
819    }
820    Ok(resolved)
821}
822
823fn validate_artifact_path(path: &str, artifact_kind: &str) -> Result<(), RuntimeFail> {
824    let metadata = fs::metadata(path).map_err(|error| RuntimeFail {
825        kind: "artifact-missing".to_owned(),
826        msg: format!("artifact `{path}` is missing: {error}"),
827        code: None,
828    })?;
829    match artifact_kind {
830        "directory" => {
831            if metadata.is_dir() {
832                Ok(())
833            } else {
834                Err(RuntimeFail {
835                    kind: "artifact-kind".to_owned(),
836                    msg: format!("artifact `{path}` is not a directory"),
837                    code: None,
838                })
839            }
840        }
841        _ => {
842            if metadata.is_file() {
843                Ok(())
844            } else {
845                Err(RuntimeFail {
846                    kind: "artifact-kind".to_owned(),
847                    msg: format!("artifact `{path}` is not a file"),
848                    code: None,
849                })
850            }
851        }
852    }
853}
854
855fn check_artifact(
856    name: &str,
857    artifact_kind: &str,
858    artifacts: &BTreeMap<String, ArtifactBinding>,
859) -> Result<ArtifactBinding, RuntimeFail> {
860    let binding = artifacts.get(name).cloned().ok_or_else(|| RuntimeFail {
861        kind: "artifact-missing".to_owned(),
862        msg: format!("unknown artifact `{name}`"),
863        code: None,
864    })?;
865    if binding.kind != artifact_kind {
866        return Err(RuntimeFail {
867            kind: "artifact-kind".to_owned(),
868            msg: format!(
869                "artifact `{name}` has kind `{}`, expected `{artifact_kind}`",
870                binding.kind
871            ),
872            code: None,
873        });
874    }
875    validate_artifact_path(&binding.path, artifact_kind)?;
876    Ok(binding)
877}
878
879fn artifact_binding_value(binding: &ArtifactBinding) -> RuntimeValue {
880    RuntimeValue::Object(BTreeMap::from([
881        ("name".to_owned(), RuntimeValue::String(binding.name.clone())),
882        ("kind".to_owned(), RuntimeValue::String(binding.kind.clone())),
883        ("path".to_owned(), RuntimeValue::String(binding.path.clone())),
884    ]))
885}
886
887fn execute_extract(
888    name: &str,
889    select: &str,
890    jsonl: bool,
891    case_id: &str,
892    state: &mut CaseState,
893) -> Result<RuntimeValue, RuntimeFail> {
894    let binding = check_artifact(name, if jsonl { "jsonl" } else { "json" }, &state.artifacts)?;
895    let action_ix = state.action_ix;
896    state.action_ix += 1;
897    match extract_value(&binding.path, select, jsonl) {
898        Ok((match_count, value_text)) => {
899            let action_name = if jsonl { "extract_jsonl" } else { "extract_json" };
900            let format = if jsonl { "jsonl" } else { "json" };
901            let (preview_len, preview_b64, truncated) = preview_bytes(value_text.as_bytes());
902            let value_text_field = if preview_len == 0 { None } else { Some(value_text.clone()) };
903            state.records.push(ReportRecord::Action(ActionRecord {
904                case_id: case_id.to_owned(),
905                action_ix,
906                action: action_name.to_owned(),
907                status: ActionStatus::Ok,
908                args: json!({"name": name, "select": select}),
909                ok: Some(json!({
910                    "source_artifact": name,
911                    "select": select,
912                    "match_count": match_count,
913                    "value_text": value_text_field,
914                    "value_text_truncated": truncated
915                })),
916                fail: None,
917            }));
918            state.records.push(ReportRecord::Extract(ExtractRecord {
919                case_id: case_id.to_owned(),
920                extract_ix: state.extract_ix,
921                action_ix,
922                source_artifact: name.to_owned(),
923                format: format.to_owned(),
924                status: ActionStatus::Ok,
925                select: Some(select.to_owned()),
926                summary: Some(json!({
927                    "match_count": match_count,
928                    "value_text": preview_b64.map(|_| value_text),
929                    "value_text_truncated": truncated
930                })),
931                fail: None,
932            }));
933            state.extract_ix += 1;
934            Ok(RuntimeValue::Object(BTreeMap::from([
935                ("format".to_owned(), RuntimeValue::String(format.to_owned())),
936                ("select".to_owned(), RuntimeValue::String(select.to_owned())),
937                ("count".to_owned(), RuntimeValue::Int(match_count as i64)),
938                ("text".to_owned(), RuntimeValue::String(value_text_field.unwrap_or_default())),
939            ])))
940        }
941        Err(fail) => {
942            let action_name = if jsonl { "extract_jsonl" } else { "extract_json" };
943            let format = if jsonl { "jsonl" } else { "json" };
944            state.records.push(ReportRecord::Action(ActionRecord {
945                case_id: case_id.to_owned(),
946                action_ix,
947                action: action_name.to_owned(),
948                status: ActionStatus::Fail,
949                args: json!({"name": name, "select": select}),
950                ok: None,
951                fail: Some(ActionFail {
952                    kind: fail.kind.clone(),
953                    msg: fail.msg.clone(),
954                    code: fail.code.clone(),
955                }),
956            }));
957            state.records.push(ReportRecord::Extract(ExtractRecord {
958                case_id: case_id.to_owned(),
959                extract_ix: state.extract_ix,
960                action_ix,
961                source_artifact: name.to_owned(),
962                format: format.to_owned(),
963                status: ActionStatus::Fail,
964                select: Some(select.to_owned()),
965                summary: None,
966                fail: Some(ActionFail {
967                    kind: fail.kind.clone(),
968                    msg: fail.msg.clone(),
969                    code: fail.code.clone(),
970                }),
971            }));
972            state.extract_ix += 1;
973            Err(fail)
974        }
975    }
976}
977
978fn extract_value(path: &str, select: &str, jsonl: bool) -> Result<(usize, String), RuntimeFail> {
979    let source = fs::read_to_string(path).map_err(|error| RuntimeFail {
980        kind: "io".to_owned(),
981        msg: format!("failed to read `{path}`: {error}"),
982        code: None,
983    })?;
984    let mut matches = Vec::new();
985    if jsonl {
986        for line in source.lines() {
987            if line.trim().is_empty() {
988                continue;
989            }
990            let value = serde_json::from_str::<serde_json::Value>(line).map_err(|error| RuntimeFail {
991                kind: "protocol".to_owned(),
992                msg: format!("invalid jsonl in `{path}`: {error}"),
993                code: None,
994            })?;
995            if let Some(found) = select_json_value(&value, select) {
996                matches.push(found);
997            }
998        }
999    } else {
1000        let value = serde_json::from_str::<serde_json::Value>(&source).map_err(|error| RuntimeFail {
1001            kind: "protocol".to_owned(),
1002            msg: format!("invalid json in `{path}`: {error}"),
1003            code: None,
1004        })?;
1005        if let Some(found) = select_json_value(&value, select) {
1006            matches.push(found);
1007        }
1008    }
1009    let first = matches.first().cloned().unwrap_or_else(|| "".to_owned());
1010    Ok((matches.len(), first))
1011}
1012
1013fn select_json_value(value: &serde_json::Value, select: &str) -> Option<String> {
1014    let path = select.strip_prefix("$.")?;
1015    let mut current = value;
1016    for segment in path.split('.') {
1017        current = current.get(segment)?;
1018    }
1019    Some(match current {
1020        serde_json::Value::String(text) => text.clone(),
1021        other => other.to_string(),
1022    })
1023}
1024
1025fn normalize_joined_path(parts: &[String]) -> String {
1026    let mut path = PathBuf::new();
1027    for part in parts {
1028        path.push(part);
1029    }
1030    normalize_relative_path(&path)
1031}
1032
1033fn execute_inventory_entry(
1034    entry: &InventoryEntry,
1035    providers: Option<&ResolvedProviders>,
1036    timeout_ms: u32,
1037) -> Result<ProcRun, RuntimeFail> {
1038    match &entry.runner {
1039        InventoryRunner::Exec { path, args } => execute_process(path, args, timeout_ms),
1040        InventoryRunner::Sh { command } => execute_process("/bin/sh", &["-c".to_owned(), command.clone()], timeout_ms),
1041        InventoryRunner::Provider { provider, target } => {
1042            let providers = providers.ok_or_else(|| RuntimeFail {
1043                kind: "provider-config".to_owned(),
1044                msg: "provider config required".to_owned(),
1045                code: None,
1046            })?;
1047            let resolved = providers.get(provider).ok_or_else(|| RuntimeFail {
1048                kind: "provider-config".to_owned(),
1049                msg: format!("unresolved provider `{provider}`"),
1050                code: None,
1051            })?;
1052            let response = run_provider_target(resolved, target, timeout_ms).map_err(to_runtime_fail)?;
1053            let out = decode_b64_bytes(&response.out_b64).map_err(to_runtime_fail)?;
1054            let err = decode_b64_bytes(&response.err_b64).map_err(to_runtime_fail)?;
1055            Ok(ProcRun {
1056                exit: response.exit,
1057                out,
1058                err,
1059                telemetry: response.telemetry,
1060            })
1061        }
1062    }
1063}
1064
1065fn execute_process(path: &str, args: &[String], timeout_ms: u32) -> Result<ProcRun, RuntimeFail> {
1066    let started_at = Instant::now();
1067    let usage_before = child_resource_usage();
1068    let mut child = Command::new(path)
1069        .args(args)
1070        .stdout(Stdio::piped())
1071        .stderr(Stdio::piped())
1072        .spawn()
1073        .map_err(|error| RuntimeFail {
1074            kind: "spawn".to_owned(),
1075            msg: error.to_string(),
1076            code: None,
1077        })?;
1078
1079    let deadline = Instant::now() + Duration::from_millis(u64::from(timeout_ms));
1080    loop {
1081        match child.try_wait() {
1082            Ok(Some(_)) => {
1083                let output = child.wait_with_output().map_err(|error| RuntimeFail {
1084                    kind: "io".to_owned(),
1085                    msg: error.to_string(),
1086                    code: None,
1087                })?;
1088                let usage_after = child_resource_usage();
1089                return Ok(ProcRun {
1090                    exit: output.status.code().unwrap_or(-1),
1091                    out: output.stdout,
1092                    err: output.stderr,
1093                    telemetry: runner_telemetry(started_at, usage_before, usage_after),
1094                });
1095            }
1096            Ok(None) => {
1097                if Instant::now() >= deadline {
1098                    let _ = child.kill();
1099                    let _ = child.wait();
1100                    return Err(RuntimeFail {
1101                        kind: "timeout".to_owned(),
1102                        msg: "timeout".to_owned(),
1103                        code: None,
1104                    });
1105                }
1106                thread::sleep(Duration::from_millis(10));
1107            }
1108            Err(error) => {
1109                return Err(RuntimeFail {
1110                    kind: "io".to_owned(),
1111                    msg: error.to_string(),
1112                    code: None,
1113                })
1114            }
1115        }
1116    }
1117}
1118
1119fn runner_telemetry(
1120    started_at: Instant,
1121    usage_before: Option<ChildResourceUsage>,
1122    usage_after: Option<ChildResourceUsage>,
1123) -> Vec<TelemetryEntry> {
1124    let mut telemetry = vec![TelemetryEntry {
1125        name: "wall_time_ns".to_owned(),
1126        unit: Some("ns".to_owned()),
1127        value: crate::TelemetryValue::Metric {
1128            value: telemetry_number_from_u128(started_at.elapsed().as_nanos()),
1129        },
1130    }];
1131
1132    if let (Some(before), Some(after)) = (usage_before, usage_after) {
1133        telemetry.push(TelemetryEntry {
1134            name: "cpu_user_ns".to_owned(),
1135            unit: Some("ns".to_owned()),
1136            value: crate::TelemetryValue::Metric {
1137                value: Number::from(after.user_ns.saturating_sub(before.user_ns)),
1138            },
1139        });
1140        telemetry.push(TelemetryEntry {
1141            name: "cpu_system_ns".to_owned(),
1142            unit: Some("ns".to_owned()),
1143            value: crate::TelemetryValue::Metric {
1144                value: Number::from(after.system_ns.saturating_sub(before.system_ns)),
1145            },
1146        });
1147        telemetry.push(TelemetryEntry {
1148            name: "peak_rss_bytes".to_owned(),
1149            unit: Some("bytes".to_owned()),
1150            value: crate::TelemetryValue::Metric {
1151                value: Number::from(after.peak_rss_bytes),
1152            },
1153        });
1154    }
1155
1156    telemetry
1157}
1158
1159fn telemetry_number_from_u128(value: u128) -> Number {
1160    let value = value.min(u128::from(u64::MAX));
1161    Number::from(value as u64)
1162}
1163
1164#[derive(Debug, Clone, Copy)]
1165struct ChildResourceUsage {
1166    user_ns: u64,
1167    system_ns: u64,
1168    peak_rss_bytes: u64,
1169}
1170
1171#[cfg(unix)]
1172fn child_resource_usage() -> Option<ChildResourceUsage> {
1173    let mut usage = std::mem::MaybeUninit::<libc::rusage>::zeroed();
1174    let status = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, usage.as_mut_ptr()) };
1175    if status != 0 {
1176        return None;
1177    }
1178
1179    let usage = unsafe { usage.assume_init() };
1180    Some(ChildResourceUsage {
1181        user_ns: timeval_to_ns(usage.ru_utime),
1182        system_ns: timeval_to_ns(usage.ru_stime),
1183        peak_rss_bytes: peak_rss_bytes(usage.ru_maxrss),
1184    })
1185}
1186
1187#[cfg(not(unix))]
1188fn child_resource_usage() -> Option<ChildResourceUsage> {
1189    None
1190}
1191
1192#[cfg(unix)]
1193fn timeval_to_ns(value: libc::timeval) -> u64 {
1194    let secs = u64::try_from(value.tv_sec).unwrap_or(0);
1195    let micros = u64::try_from(value.tv_usec).unwrap_or(0);
1196    secs.saturating_mul(1_000_000_000)
1197        .saturating_add(micros.saturating_mul(1_000))
1198}
1199
1200#[cfg(unix)]
1201fn peak_rss_bytes(value: libc::c_long) -> u64 {
1202    let value = u64::try_from(value).unwrap_or(0);
1203    #[cfg(target_os = "linux")]
1204    {
1205        value.saturating_mul(1024)
1206    }
1207    #[cfg(not(target_os = "linux"))]
1208    {
1209        value
1210    }
1211}
1212
1213fn append_action_telemetry_records(
1214    records: &mut Vec<ReportRecord>,
1215    case_id: &str,
1216    action_ix: usize,
1217    telemetry: &[TelemetryEntry],
1218) {
1219    for entry in telemetry {
1220        records.push(ReportRecord::Telemetry(TelemetryRecord::action(
1221            case_id.to_owned(),
1222            action_ix,
1223            entry.clone(),
1224        )));
1225    }
1226}
1227
1228fn evaluate_assert(predicate: &Predicate, state: &CaseState) -> ObserverResult<(Status, String)> {
1229    match predicate {
1230        Predicate::Fail { msg } => Ok((Status::Fail, msg.clone())),
1231        _ => {
1232            let pass = evaluate_predicate(predicate, state)?;
1233            if pass {
1234                Ok((Status::Pass, "expectation passed".to_owned()))
1235            } else {
1236                Ok((Status::Fail, "expectation failed".to_owned()))
1237            }
1238        }
1239    }
1240}
1241
1242fn evaluate_predicate(predicate: &Predicate, state: &CaseState) -> ObserverResult<bool> {
1243    match predicate {
1244        Predicate::Compare { op, left, right } => {
1245            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1246            let right = eval_value(right, &state.bindings, &state.artifacts)?;
1247            compare_values(*op, &left, &right)
1248        }
1249        Predicate::IsStatus { left, status } => {
1250            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1251            is_status_value(&left, *status)
1252        }
1253        Predicate::IsStatusClass { left, class } => {
1254            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1255            is_status_class_value(&left, *class)
1256        }
1257        Predicate::HasHeader { left, name } => {
1258            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1259            has_header_value(&left, name)
1260        }
1261        Predicate::Contains { left, right } => {
1262            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1263            let right = eval_value(right, &state.bindings, &state.artifacts)?;
1264            contains_value(&left, &right)
1265        }
1266        Predicate::ContainsRegex { left, regex } | Predicate::Match { left, regex } => {
1267            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1268            match left {
1269                RuntimeValue::String(value) => Ok(Regex::new(regex)
1270                    .map_err(|error| ObserverError::Runtime(error.to_string()))?
1271                    .is_match(&value)),
1272                RuntimeValue::Bytes(bytes) => match String::from_utf8(bytes) {
1273                    Ok(value) => Ok(Regex::new(regex)
1274                        .map_err(|error| ObserverError::Runtime(error.to_string()))?
1275                        .is_match(&value)),
1276                    Err(_) => Ok(false),
1277                },
1278                _ => Err(ObserverError::Runtime(
1279                    "regex predicates require string or bytes operands".to_owned(),
1280                )),
1281            }
1282        }
1283        Predicate::StartsWith { left, right } => {
1284            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1285            let right = eval_value(right, &state.bindings, &state.artifacts)?;
1286            starts_with_value(&left, &right)
1287        }
1288        Predicate::EndsWith { left, right } => {
1289            let left = eval_value(left, &state.bindings, &state.artifacts)?;
1290            let right = eval_value(right, &state.bindings, &state.artifacts)?;
1291            ends_with_value(&left, &right)
1292        }
1293        Predicate::Fail { .. } => Ok(false),
1294    }
1295}
1296
1297fn compare_values(op: CompareOp, left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1298    match (left, right) {
1299        (RuntimeValue::Int(left), RuntimeValue::Int(right)) => Ok(match op {
1300            CompareOp::Eq => left == right,
1301            CompareOp::Ne => left != right,
1302            CompareOp::Lt => left < right,
1303            CompareOp::Le => left <= right,
1304            CompareOp::Gt => left > right,
1305            CompareOp::Ge => left >= right,
1306        }),
1307        (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(match op {
1308            CompareOp::Eq => left == right,
1309            CompareOp::Ne => left != right,
1310            _ => false,
1311        }),
1312        (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(match op {
1313            CompareOp::Eq => left == right,
1314            CompareOp::Ne => left != right,
1315            _ => false,
1316        }),
1317        (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => compare_values(
1318            op,
1319            &RuntimeValue::Bytes(left.clone()),
1320            &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1321        ),
1322        (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => compare_values(
1323            op,
1324            &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1325            &RuntimeValue::Bytes(right.clone()),
1326        ),
1327        _ => Err(ObserverError::Runtime(
1328            "unsupported operand types for comparison".to_owned(),
1329        )),
1330    }
1331}
1332
1333fn contains_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1334    match (left, right) {
1335        (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.contains(right)),
1336        (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => {
1337            Ok(left.windows(right.len()).any(|window| window == right.as_slice()))
1338        }
1339        (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => contains_value(
1340            &RuntimeValue::Bytes(left.clone()),
1341            &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1342        ),
1343        (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => contains_value(
1344            &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1345            &RuntimeValue::Bytes(right.clone()),
1346        ),
1347        _ => Err(ObserverError::Runtime(
1348            "unsupported operand types for contains".to_owned(),
1349        )),
1350    }
1351}
1352
1353fn starts_with_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1354    match (left, right) {
1355        (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.starts_with(right)),
1356        (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(left.starts_with(right)),
1357        (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => starts_with_value(
1358            &RuntimeValue::Bytes(left.clone()),
1359            &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1360        ),
1361        (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => starts_with_value(
1362            &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1363            &RuntimeValue::Bytes(right.clone()),
1364        ),
1365        _ => Err(ObserverError::Runtime(
1366            "unsupported operand types for startsWith".to_owned(),
1367        )),
1368    }
1369}
1370
1371fn ends_with_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1372    match (left, right) {
1373        (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.ends_with(right)),
1374        (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(left.ends_with(right)),
1375        (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => ends_with_value(
1376            &RuntimeValue::Bytes(left.clone()),
1377            &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1378        ),
1379        (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => ends_with_value(
1380            &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1381            &RuntimeValue::Bytes(right.clone()),
1382        ),
1383        _ => Err(ObserverError::Runtime(
1384            "unsupported operand types for endsWith".to_owned(),
1385        )),
1386    }
1387}
1388
1389fn has_header_value(left: &RuntimeValue, name: &str) -> ObserverResult<bool> {
1390    let header_name = name.to_ascii_lowercase();
1391    match left {
1392        RuntimeValue::HttpResp(response) => Ok(response.headers.contains_key(&header_name)),
1393        _ => Err(ObserverError::Runtime(
1394            "hasHeader predicates require an http response operand".to_owned(),
1395        )),
1396    }
1397}
1398
1399fn is_status_value(left: &RuntimeValue, status: i64) -> ObserverResult<bool> {
1400    match left {
1401        RuntimeValue::HttpResp(response) => Ok(response.status == status),
1402        _ => Err(ObserverError::Runtime(
1403            "isStatus predicates require an http response operand".to_owned(),
1404        )),
1405    }
1406}
1407
1408fn is_status_class_value(left: &RuntimeValue, class: i64) -> ObserverResult<bool> {
1409    if !(1..=5).contains(&class) {
1410        return Err(ObserverError::Runtime(
1411            "isStatusClass requires a status class in the range 1..=5".to_owned(),
1412        ));
1413    }
1414    match left {
1415        RuntimeValue::HttpResp(response) => Ok(response.status / 100 == class),
1416        _ => Err(ObserverError::Runtime(
1417            "isStatusClass predicates require an http response operand".to_owned(),
1418        )),
1419    }
1420}
1421
1422fn eval_value(
1423    expr: &ValueExpr,
1424    bindings: &BTreeMap<String, RuntimeValue>,
1425    artifacts: &BTreeMap<String, ArtifactBinding>,
1426) -> ObserverResult<RuntimeValue> {
1427    match expr {
1428        ValueExpr::Binding { name } => bindings
1429            .get(name)
1430            .cloned()
1431            .ok_or_else(|| ObserverError::Runtime(format!("unbound identifier `{name}`"))),
1432        ValueExpr::String { value } => Ok(RuntimeValue::String(value.clone())),
1433        ValueExpr::BytesUtf8 { value } => Ok(RuntimeValue::Bytes(value.as_bytes().to_vec())),
1434        ValueExpr::Int { value } => Ok(RuntimeValue::Int(*value)),
1435        ValueExpr::Field { base, name } => {
1436            let value = eval_value(base, bindings, artifacts)?;
1437            match (value, name.as_str()) {
1438                (RuntimeValue::ProcRun(run), "exit") => Ok(RuntimeValue::Int(run.exit.into())),
1439                (RuntimeValue::ProcRun(run), "out") => Ok(RuntimeValue::Bytes(run.out)),
1440                (RuntimeValue::ProcRun(run), "err") => Ok(RuntimeValue::Bytes(run.err)),
1441                (RuntimeValue::HttpResp(response), "status") => Ok(RuntimeValue::Int(response.status)),
1442                (RuntimeValue::HttpResp(response), "body") => Ok(RuntimeValue::Bytes(response.body)),
1443                (RuntimeValue::TcpResp(response), "bytes") => Ok(RuntimeValue::Bytes(response.bytes)),
1444                (RuntimeValue::Object(fields), field_name) => fields.get(field_name).cloned().ok_or_else(|| {
1445                    ObserverError::Runtime(format!("unknown field `{field_name}`"))
1446                }),
1447                (RuntimeValue::Fail(fail), "kind") => Ok(RuntimeValue::String(fail.kind)),
1448                (RuntimeValue::Fail(fail), "msg") => Ok(RuntimeValue::String(fail.msg)),
1449                (RuntimeValue::Fail(fail), "code") => {
1450                    if let Some(code) = fail.code {
1451                        Ok(RuntimeValue::String(code))
1452                    } else {
1453                        Err(ObserverError::Runtime(
1454                            "optional fail field `code` is absent".to_owned(),
1455                        ))
1456                    }
1457                }
1458                _ => Err(ObserverError::Runtime(format!("unknown field `{name}`"))),
1459            }
1460        }
1461        ValueExpr::Header { base, name } => {
1462            let header_name = name.to_ascii_lowercase();
1463            let value = eval_value(base, bindings, artifacts)?;
1464            match value {
1465                RuntimeValue::HttpResp(response) => response
1466                    .headers
1467                    .get(&header_name)
1468                    .cloned()
1469                    .map(|value| RuntimeValue::String(value))
1470                    .ok_or_else(|| {
1471                        ObserverError::Runtime(format!(
1472                            "header `{header_name}` is absent"
1473                        ))
1474                    }),
1475                _ => Err(ObserverError::Runtime(
1476                    "header lookup requires an http response operand".to_owned(),
1477                )),
1478            }
1479        }
1480        ValueExpr::ArtifactPath { name } => artifacts
1481            .get(name)
1482            .map(|binding| RuntimeValue::String(binding.path.clone()))
1483            .ok_or_else(|| ObserverError::Runtime(format!("unknown artifact `{name}`"))),
1484        ValueExpr::JoinPath { parts } => {
1485            let mut strings = Vec::with_capacity(parts.len());
1486            for part in parts {
1487                let value = eval_value(part, bindings, artifacts)?;
1488                strings.push(as_string(value)?);
1489            }
1490            Ok(RuntimeValue::String(normalize_joined_path(&strings)))
1491        }
1492    }
1493}
1494
1495fn as_string(value: RuntimeValue) -> ObserverResult<String> {
1496    match value {
1497        RuntimeValue::String(value) => Ok(value),
1498        _ => Err(ObserverError::Runtime("expected string value".to_owned())),
1499    }
1500}
1501
1502fn as_bytes(value: RuntimeValue) -> ObserverResult<Vec<u8>> {
1503    match value {
1504        RuntimeValue::Bytes(value) => Ok(value),
1505        RuntimeValue::String(value) => Ok(value.into_bytes()),
1506        _ => Err(ObserverError::Runtime("expected bytes or string value".to_owned())),
1507    }
1508}
1509
1510fn proc_ok_payload(run: &ProcRun) -> serde_json::Value {
1511    let (out_len, out_preview, out_truncated) = preview_bytes(&run.out);
1512    let (err_len, err_preview, err_truncated) = preview_bytes(&run.err);
1513    let mut payload = json!({
1514        "exit": run.exit,
1515        "out_len": out_len,
1516        "err_len": err_len,
1517    });
1518    if let Some(preview) = out_preview {
1519        payload["out_preview_b64"] = json!(preview);
1520    }
1521    if let Some(preview) = err_preview {
1522        payload["err_preview_b64"] = json!(preview);
1523    }
1524    if let Some(truncated) = out_truncated {
1525        payload["out_truncated"] = json!(truncated);
1526    }
1527    if let Some(truncated) = err_truncated {
1528        payload["err_truncated"] = json!(truncated);
1529    }
1530    payload
1531}
1532
1533fn http_ok_payload(response: &HttpResp) -> serde_json::Value {
1534    let (body_len, body_preview, body_truncated) = preview_bytes(&response.body);
1535    let mut payload = json!({
1536        "status": response.status,
1537        "body_len": body_len,
1538        "headers": response.headers,
1539    });
1540    if let Some(preview) = body_preview {
1541        payload["body_preview_b64"] = json!(preview);
1542    }
1543    if let Some(truncated) = body_truncated {
1544        payload["body_truncated"] = json!(truncated);
1545    }
1546    payload
1547}
1548
1549fn tcp_ok_payload(response: &TcpResp) -> serde_json::Value {
1550    let (bytes_len, bytes_preview, bytes_truncated) = preview_bytes(&response.bytes);
1551    let mut payload = json!({
1552        "bytes_len": bytes_len,
1553    });
1554    if let Some(preview) = bytes_preview {
1555        payload["bytes_preview_b64"] = json!(preview);
1556    }
1557    if let Some(truncated) = bytes_truncated {
1558        payload["bytes_truncated"] = json!(truncated);
1559    }
1560    payload
1561}
1562
1563fn execute_http_get(url: &str, timeout_ms: u32) -> Result<HttpResp, RuntimeFail> {
1564    let client = reqwest::blocking::Client::builder()
1565        .timeout(Duration::from_millis(u64::from(timeout_ms)))
1566        .build()
1567        .map_err(|error| RuntimeFail {
1568            kind: "http".to_owned(),
1569            msg: error.to_string(),
1570            code: None,
1571        })?;
1572
1573    let response = client.get(url).send().map_err(http_error_to_runtime_fail)?;
1574    let status = i64::from(response.status().as_u16());
1575    let mut headers = BTreeMap::new();
1576    for (name, value) in response.headers() {
1577        let header_value = value.to_str().map_err(|error| RuntimeFail {
1578            kind: "http".to_owned(),
1579            msg: error.to_string(),
1580            code: None,
1581        })?;
1582        headers.insert(name.as_str().to_ascii_lowercase(), header_value.to_owned());
1583    }
1584    let body = response.bytes().map_err(http_error_to_runtime_fail)?.to_vec();
1585
1586    Ok(HttpResp {
1587        status,
1588        body,
1589        headers,
1590    })
1591}
1592
1593fn execute_tcp(
1594    address: &str,
1595    send: &[u8],
1596    recv_max: usize,
1597    timeout_ms: u32,
1598) -> Result<TcpResp, RuntimeFail> {
1599    let timeout = Duration::from_millis(u64::from(timeout_ms));
1600    let addrs = resolve_socket_addrs(address)?;
1601    let mut last_error = None;
1602
1603    for socket_addr in addrs {
1604        match TcpStream::connect_timeout(&socket_addr, timeout) {
1605            Ok(mut stream) => {
1606                stream
1607                    .set_read_timeout(Some(timeout))
1608                    .map_err(io_runtime_fail)?;
1609                stream
1610                    .set_write_timeout(Some(timeout))
1611                    .map_err(io_runtime_fail)?;
1612                stream.write_all(send).map_err(io_runtime_fail)?;
1613
1614                let mut bytes = Vec::new();
1615                let mut buffer = vec![0u8; recv_max.min(4096).max(1)];
1616                while bytes.len() < recv_max {
1617                    let remaining = recv_max - bytes.len();
1618                    let read_len = remaining.min(buffer.len());
1619                    match stream.read(&mut buffer[..read_len]) {
1620                        Ok(0) => break,
1621                        Ok(count) => bytes.extend_from_slice(&buffer[..count]),
1622                        Err(error)
1623                            if error.kind() == std::io::ErrorKind::WouldBlock
1624                                || error.kind() == std::io::ErrorKind::TimedOut =>
1625                        {
1626                            if bytes.is_empty() {
1627                                return Err(RuntimeFail {
1628                                    kind: "timeout".to_owned(),
1629                                    msg: "timeout".to_owned(),
1630                                    code: None,
1631                                });
1632                            }
1633                            break;
1634                        }
1635                        Err(error) => return Err(io_runtime_fail(error)),
1636                    }
1637                }
1638
1639                return Ok(TcpResp { bytes });
1640            }
1641            Err(error) => last_error = Some(error),
1642        }
1643    }
1644
1645    Err(last_error.map(io_runtime_fail).unwrap_or(RuntimeFail {
1646        kind: "tcp".to_owned(),
1647        msg: format!("no socket addresses resolved for `{address}`"),
1648        code: None,
1649    }))
1650}
1651
1652fn resolve_socket_addrs(address: &str) -> Result<Vec<SocketAddr>, RuntimeFail> {
1653    let mut addrs = address.to_socket_addrs().map_err(io_runtime_fail)?.collect::<Vec<_>>();
1654    addrs.sort_by(|left, right| left.to_string().as_bytes().cmp(right.to_string().as_bytes()));
1655    addrs.dedup();
1656    Ok(addrs)
1657}
1658
1659fn http_error_to_runtime_fail(error: reqwest::Error) -> RuntimeFail {
1660    let kind = if error.is_timeout() {
1661        "timeout"
1662    } else {
1663        "http"
1664    };
1665    RuntimeFail {
1666        kind: kind.to_owned(),
1667        msg: error.to_string(),
1668        code: error.status().map(|status| status.as_u16().to_string()),
1669    }
1670}
1671
1672fn io_runtime_fail(error: std::io::Error) -> RuntimeFail {
1673    RuntimeFail {
1674        kind: "io".to_owned(),
1675        msg: error.to_string(),
1676        code: None,
1677    }
1678}
1679
1680fn to_runtime_fail(error: ObserverError) -> RuntimeFail {
1681    match error {
1682        ObserverError::Runtime(msg) if msg == "timeout" => RuntimeFail {
1683            kind: "timeout".to_owned(),
1684            msg,
1685            code: None,
1686        },
1687        other => RuntimeFail {
1688            kind: "runtime".to_owned(),
1689            msg: other.to_string(),
1690            code: None,
1691        },
1692    }
1693}