1use crate::error::{ObserverError, ObserverResult};
5use crate::inventory::{Inventory, InventoryEntry, InventoryRunner};
6use crate::normalize::{normalized_inventory_sha256, normalized_suite_sha256};
7use crate::providers::{decode_b64_bytes, run_provider_target, ResolvedProviders};
8use crate::report::{
9 derive_case_id, preview_bytes, ActionFail, ActionRecord, ActionStatus, ArtifactRecord,
10 AssertRecord, CaseRecord, ExtractRecord, ReportHeader, ReportMode, ReportRecord, Status, SummaryRecord,
11 TelemetryEntry, TelemetryRecord,
12};
13use crate::suite::{
14 CaseKeyField, CaseSource, CompareOp, Predicate, ResultExpr, SelectionMode, Selector, Statement,
15 SuiteCore, ValueExpr,
16};
17use regex::Regex;
18use serde_json::json;
19use serde_json::Number;
20use std::collections::BTreeMap;
21use std::fs;
22use std::io::{Read, Write};
23use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
24use std::path::{Path, PathBuf};
25use std::process::{Command, Stdio};
26use std::thread;
27use std::time::{Duration, Instant};
28
29#[derive(Debug, Clone)]
30pub struct ExecutionOutcome {
31 pub records: Vec<ReportRecord>,
32 pub exit_code: i32,
33}
34
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct ExecutionOptions {
37 pub emit_telemetry: bool,
38 pub filter_test_name: Option<String>,
39}
40
41#[derive(Debug, Clone, PartialEq)]
42struct ProcRun {
43 exit: i32,
44 out: Vec<u8>,
45 err: Vec<u8>,
46 telemetry: Vec<TelemetryEntry>,
47}
48
49#[derive(Debug, Clone, PartialEq)]
50struct HttpResp {
51 status: i64,
52 body: Vec<u8>,
53 headers: BTreeMap<String, String>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57struct TcpResp {
58 bytes: Vec<u8>,
59}
60
61#[derive(Debug, Clone, PartialEq)]
62struct RuntimeFail {
63 kind: String,
64 msg: String,
65 code: Option<String>,
66}
67
68#[derive(Debug, Clone, PartialEq)]
69enum RuntimeValue {
70 Int(i64),
71 String(String),
72 Bytes(Vec<u8>),
73 Object(BTreeMap<String, RuntimeValue>),
74 ProcRun(ProcRun),
75 HttpResp(HttpResp),
76 TcpResp(TcpResp),
77 Fail(RuntimeFail),
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
81struct ArtifactBinding {
82 name: String,
83 kind: String,
84 path: String,
85}
86
87#[derive(Debug, Clone, PartialEq)]
88struct CaseInstance {
89 case_key: String,
90 case_value: RuntimeValue,
91 test_name: String,
92}
93
94#[derive(Debug, Default)]
95struct CaseState {
96 bindings: BTreeMap<String, RuntimeValue>,
97 artifacts: BTreeMap<String, ArtifactBinding>,
98 action_ix: usize,
99 artifact_ix: usize,
100 extract_ix: usize,
101 assert_ix: usize,
102 assert_pass: usize,
103 assert_fail: usize,
104 unhandled_action_fail: usize,
105 records: Vec<ReportRecord>,
106}
107
108pub fn execute_suite(
109 inventory: &Inventory,
110 suite: &SuiteCore,
111 mode: ReportMode,
112) -> ObserverResult<ExecutionOutcome> {
113 execute_suite_with_options(inventory, suite, mode, ExecutionOptions::default())
114}
115
116pub fn execute_suite_with_options(
117 inventory: &Inventory,
118 suite: &SuiteCore,
119 mode: ReportMode,
120 options: ExecutionOptions,
121) -> ObserverResult<ExecutionOutcome> {
122 execute_suite_with_providers_and_options(inventory, suite, mode, None, options)
123}
124
125pub fn execute_suite_with_providers(
126 inventory: &Inventory,
127 suite: &SuiteCore,
128 mode: ReportMode,
129 providers: Option<&ResolvedProviders>,
130) -> ObserverResult<ExecutionOutcome> {
131 execute_suite_with_providers_and_options(
132 inventory,
133 suite,
134 mode,
135 providers,
136 ExecutionOptions::default(),
137 )
138}
139
140pub fn execute_suite_with_providers_and_options(
141 inventory: &Inventory,
142 suite: &SuiteCore,
143 mode: ReportMode,
144 providers: Option<&ResolvedProviders>,
145 options: ExecutionOptions,
146) -> ObserverResult<ExecutionOutcome> {
147 let inventory_sha256 = normalized_inventory_sha256(inventory)?;
148 let suite_sha256 = normalized_suite_sha256(suite)?;
149
150 let mut records = vec![ReportRecord::Header(ReportHeader::new(
151 inventory_sha256,
152 suite_sha256,
153 mode,
154 ))];
155
156 let mut case_pass = 0usize;
157 let mut case_fail = 0usize;
158 let mut assert_pass = 0usize;
159 let mut assert_fail = 0usize;
160
161 for item in &suite.items {
162 let mut selected = discover_cases(inventory, &item.case_source)?;
163 if let Some(filter) = options.filter_test_name.as_deref() {
164 selected.retain(|case| case.case_key == filter || case.test_name == filter);
165 }
166 if selected.is_empty() && matches!(item.selection_mode, SelectionMode::Required) {
167 if options.filter_test_name.is_some() {
168 continue;
169 }
170 return Err(ObserverError::Runtime(format!(
171 "required case source for item `{}` produced no cases",
172 item.item_id
173 )));
174 }
175
176 for case in selected {
177 let case_id = derive_case_id(&item.item_id, &case.case_key);
178 let mut state = CaseState::default();
179 state
180 .bindings
181 .insert(item.case_binding.clone(), case.case_value.clone());
182
183 for statement in &item.body {
184 evaluate_statement(statement, inventory, providers, &case_id, &mut state, &options)?;
185 }
186
187 let status = if state.assert_fail == 0 && state.unhandled_action_fail == 0 {
188 case_pass += 1;
189 Status::Pass
190 } else {
191 case_fail += 1;
192 Status::Fail
193 };
194
195 assert_pass += state.assert_pass;
196 assert_fail += state.assert_fail;
197 records.append(&mut state.records);
198 records.push(ReportRecord::Case(CaseRecord {
199 case_id,
200 item_id: item.item_id.clone(),
201 test_name: case.test_name,
202 case_key: match item.case_source {
203 CaseSource::Inventory { .. } => None,
204 CaseSource::Files { .. } => Some(case.case_key),
205 },
206 status,
207 assert_pass: state.assert_pass,
208 assert_fail: state.assert_fail,
209 unhandled_action_fail: state.unhandled_action_fail,
210 }));
211 }
212 }
213
214 let exit_code = if case_fail == 0 { 0 } else { 1 };
215 records.push(ReportRecord::Summary(SummaryRecord {
216 case_pass,
217 case_fail,
218 assert_pass,
219 assert_fail,
220 exit_code,
221 }));
222
223 Ok(ExecutionOutcome { records, exit_code })
224}
225
226fn discover_cases(inventory: &Inventory, case_source: &CaseSource) -> ObserverResult<Vec<CaseInstance>> {
227 match case_source {
228 CaseSource::Inventory { selector } => {
229 let mut names = Vec::new();
230 for entry in &inventory.entries {
231 if matches_selector(&entry.name, selector)? {
232 names.push(entry.name.clone());
233 }
234 }
235 names.sort_by(|left, right| left.as_bytes().cmp(right.as_bytes()));
236 Ok(names
237 .into_iter()
238 .map(|name| CaseInstance {
239 case_key: name.clone(),
240 case_value: RuntimeValue::String(name.clone()),
241 test_name: name,
242 })
243 .collect())
244 }
245 CaseSource::Files {
246 root,
247 glob,
248 key_field,
249 } => discover_file_cases(root, glob, *key_field),
250 }
251}
252
253fn matches_selector(name: &str, selector: &Selector) -> ObserverResult<bool> {
254 match selector {
255 Selector::Exact { value } => Ok(name == value),
256 Selector::Prefix { value } => Ok(name.starts_with(value)),
257 Selector::Glob { value } => Ok(matches_glob(name, value)),
258 Selector::Regex { value } => Regex::new(value)
259 .map(|regex| regex.is_match(name))
260 .map_err(|error| ObserverError::Runtime(format!("invalid selector regex: {error}"))),
261 Selector::Any { items } => {
262 for item in items {
263 if matches_selector(name, item)? {
264 return Ok(true);
265 }
266 }
267 Ok(false)
268 }
269 Selector::All { items } => {
270 for item in items {
271 if !matches_selector(name, item)? {
272 return Ok(false);
273 }
274 }
275 Ok(true)
276 }
277 }
278}
279
280fn matches_glob(name: &str, pattern: &str) -> bool {
281 matches_glob_bytes(name.as_bytes(), pattern.as_bytes())
282}
283
284fn discover_file_cases(root: &str, glob: &str, key_field: CaseKeyField) -> ObserverResult<Vec<CaseInstance>> {
285 let root_path = PathBuf::from(root);
286 let mut rel_paths = Vec::new();
287 collect_relative_files(&root_path, &root_path, &mut rel_paths)?;
288 rel_paths.sort_by(|left, right| left.as_bytes().cmp(right.as_bytes()));
289
290 let mut cases = Vec::new();
291 for rel_path in rel_paths {
292 if !matches_glob(&rel_path, glob) {
293 continue;
294 }
295 let path = Path::new(&rel_path);
296 let name = path
297 .file_name()
298 .and_then(|value| value.to_str())
299 .ok_or_else(|| ObserverError::Runtime(format!("invalid discovered file name `{rel_path}`")))?
300 .to_owned();
301 let stem = path
302 .file_stem()
303 .and_then(|value| value.to_str())
304 .ok_or_else(|| ObserverError::Runtime(format!("invalid discovered file stem `{rel_path}`")))?
305 .to_owned();
306 let ext = path
307 .extension()
308 .and_then(|value| value.to_str())
309 .unwrap_or("")
310 .to_owned();
311 let case_key = match key_field {
312 CaseKeyField::Path => rel_path.clone(),
313 CaseKeyField::Name => name.clone(),
314 CaseKeyField::Stem => stem.clone(),
315 };
316 let mut fields = BTreeMap::new();
317 fields.insert("key".to_owned(), RuntimeValue::String(case_key.clone()));
318 fields.insert("path".to_owned(), RuntimeValue::String(rel_path.clone()));
319 fields.insert("name".to_owned(), RuntimeValue::String(name));
320 fields.insert("stem".to_owned(), RuntimeValue::String(stem));
321 fields.insert("ext".to_owned(), RuntimeValue::String(ext));
322 cases.push(CaseInstance {
323 case_key: case_key.clone(),
324 test_name: case_key,
325 case_value: RuntimeValue::Object(fields),
326 });
327 }
328 Ok(cases)
329}
330
331fn collect_relative_files(root: &Path, current: &Path, out: &mut Vec<String>) -> ObserverResult<()> {
332 let entries = fs::read_dir(current)
333 .map_err(|error| ObserverError::Runtime(format!("failed to read `{}`: {error}", current.display())))?;
334 let mut children = Vec::new();
335 for entry in entries {
336 let entry = entry
337 .map_err(|error| ObserverError::Runtime(format!("failed to enumerate `{}`: {error}", current.display())))?;
338 children.push(entry.path());
339 }
340 children.sort_by(|left, right| left.as_os_str().as_encoded_bytes().cmp(right.as_os_str().as_encoded_bytes()));
341
342 for path in children {
343 let metadata = fs::metadata(&path)
344 .map_err(|error| ObserverError::Runtime(format!("failed to stat `{}`: {error}", path.display())))?;
345 if metadata.is_dir() {
346 collect_relative_files(root, &path, out)?;
347 } else if metadata.is_file() {
348 let rel = path
349 .strip_prefix(root)
350 .map_err(|error| ObserverError::Runtime(format!("failed to relativize `{}`: {error}", path.display())))?;
351 out.push(normalize_relative_path(rel));
352 }
353 }
354 Ok(())
355}
356
357fn normalize_relative_path(path: &Path) -> String {
358 path.components()
359 .map(|component| component.as_os_str().to_string_lossy().into_owned())
360 .collect::<Vec<_>>()
361 .join("/")
362}
363
364fn matches_glob_bytes(text: &[u8], pattern: &[u8]) -> bool {
365 if pattern.is_empty() {
366 return text.is_empty();
367 }
368 if pattern.len() >= 3 && pattern[0] == b'*' && pattern[1] == b'*' && pattern[2] == b'/' {
369 if matches_glob_bytes(text, &pattern[3..]) {
370 return true;
371 }
372 }
373 match pattern[0] {
374 b'*' => {
375 for offset in 0..=text.len() {
376 if matches_glob_bytes(&text[offset..], &pattern[1..]) {
377 return true;
378 }
379 }
380 false
381 }
382 b'?' => !text.is_empty() && matches_glob_bytes(&text[1..], &pattern[1..]),
383 byte => !text.is_empty() && text[0] == byte && matches_glob_bytes(&text[1..], &pattern[1..]),
384 }
385}
386
387fn evaluate_statement(
388 statement: &Statement,
389 inventory: &Inventory,
390 providers: Option<&ResolvedProviders>,
391 case_id: &str,
392 state: &mut CaseState,
393 options: &ExecutionOptions,
394) -> ObserverResult<()> {
395 match statement {
396 Statement::Assert { predicate } => {
397 let (status, msg) = evaluate_assert(predicate, state)?;
398 let record = ReportRecord::Assert(AssertRecord {
399 case_id: case_id.to_owned(),
400 assert_ix: state.assert_ix,
401 status,
402 msg,
403 });
404 state.assert_ix += 1;
405 match status {
406 Status::Pass => state.assert_pass += 1,
407 Status::Fail => state.assert_fail += 1,
408 }
409 state.records.push(record);
410 Ok(())
411 }
412 Statement::Publish {
413 name,
414 artifact_kind,
415 path,
416 } => {
417 let action_ix = state.action_ix;
418 state.action_ix += 1;
419 let path = as_string(eval_value(path, &state.bindings, &state.artifacts)?)?;
420 match validate_artifact_path(&path, artifact_kind) {
421 Ok(()) => {
422 let binding = ArtifactBinding {
423 name: name.clone(),
424 kind: artifact_kind.clone(),
425 path: path.clone(),
426 };
427 state.artifacts.insert(name.clone(), binding);
428 state.records.push(ReportRecord::Action(ActionRecord {
429 case_id: case_id.to_owned(),
430 action_ix,
431 action: "artifact_publish".to_owned(),
432 status: ActionStatus::Ok,
433 args: json!({"name": name, "kind": artifact_kind, "path": path}),
434 ok: Some(json!({"name": name, "kind": artifact_kind, "path": path})),
435 fail: None,
436 }));
437 state.records.push(ReportRecord::Artifact(ArtifactRecord {
438 case_id: case_id.to_owned(),
439 artifact_ix: state.artifact_ix,
440 action_ix,
441 name: name.clone(),
442 event: "publish".to_owned(),
443 kind: artifact_kind.clone(),
444 status: ActionStatus::Ok,
445 location: Some(json!({"path": path})),
446 fail: None,
447 }));
448 state.artifact_ix += 1;
449 Ok(())
450 }
451 Err(fail) => {
452 state.records.push(ReportRecord::Action(ActionRecord {
453 case_id: case_id.to_owned(),
454 action_ix,
455 action: "artifact_publish".to_owned(),
456 status: ActionStatus::Fail,
457 args: json!({"name": name, "kind": artifact_kind, "path": path}),
458 ok: None,
459 fail: Some(ActionFail {
460 kind: fail.kind.clone(),
461 msg: fail.msg.clone(),
462 code: fail.code.clone(),
463 }),
464 }));
465 state.records.push(ReportRecord::Artifact(ArtifactRecord {
466 case_id: case_id.to_owned(),
467 artifact_ix: state.artifact_ix,
468 action_ix,
469 name: name.clone(),
470 event: "publish".to_owned(),
471 kind: artifact_kind.clone(),
472 status: ActionStatus::Fail,
473 location: Some(json!({"path": path})),
474 fail: Some(ActionFail {
475 kind: fail.kind.clone(),
476 msg: fail.msg.clone(),
477 code: fail.code.clone(),
478 }),
479 }));
480 state.artifact_ix += 1;
481 Err(ObserverError::Runtime(format!("{}: {}", fail.kind, fail.msg)))
482 }
483 }
484 }
485 Statement::ResultBranch {
486 result,
487 ok_binding,
488 ok,
489 fail_binding,
490 fail,
491 } => {
492 match execute_result(result, inventory, providers, case_id, state, options) {
493 Ok(value) => {
494 let previous = state.bindings.insert(ok_binding.clone(), value);
495 for nested in ok {
496 evaluate_statement(nested, inventory, providers, case_id, state, options)?;
497 }
498 restore_binding(&mut state.bindings, ok_binding, previous);
499 }
500 Err(runtime_fail) => {
501 if fail.is_empty() {
502 state.unhandled_action_fail += 1;
503 } else if let Some(binding) = fail_binding {
504 let previous = state
505 .bindings
506 .insert(binding.clone(), RuntimeValue::Fail(runtime_fail));
507 for nested in fail {
508 evaluate_statement(nested, inventory, providers, case_id, state, options)?;
509 }
510 restore_binding(&mut state.bindings, binding, previous);
511 }
512 }
513 }
514 Ok(())
515 }
516 Statement::BoolBranch {
517 predicate,
518 if_true,
519 if_false,
520 } => {
521 if evaluate_predicate(predicate, state)? {
522 for nested in if_true {
523 evaluate_statement(nested, inventory, providers, case_id, state, options)?;
524 }
525 } else {
526 for nested in if_false {
527 evaluate_statement(nested, inventory, providers, case_id, state, options)?;
528 }
529 }
530 Ok(())
531 }
532 }
533}
534
535fn restore_binding(
536 bindings: &mut BTreeMap<String, RuntimeValue>,
537 key: &str,
538 previous: Option<RuntimeValue>,
539) {
540 if let Some(value) = previous {
541 bindings.insert(key.to_owned(), value);
542 } else {
543 bindings.remove(key);
544 }
545}
546
547fn execute_result(
548 result: &ResultExpr,
549 inventory: &Inventory,
550 providers: Option<&ResolvedProviders>,
551 case_id: &str,
552 state: &mut CaseState,
553 options: &ExecutionOptions,
554) -> Result<RuntimeValue, RuntimeFail> {
555 match result {
556 ResultExpr::Run { test, timeout_ms } => {
557 let test_name = as_string(eval_value(test, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
558 .map_err(to_runtime_fail)?;
559 let entry = inventory
560 .entries
561 .iter()
562 .find(|entry| entry.name == test_name)
563 .ok_or_else(|| RuntimeFail {
564 kind: "unknown-test".to_owned(),
565 msg: "unknown test".to_owned(),
566 code: None,
567 })?;
568 let action_ix = state.action_ix;
569 state.action_ix += 1;
570 match execute_inventory_entry(entry, providers, *timeout_ms) {
571 Ok(run) => {
572 let telemetry = run.telemetry.clone();
573 state.records.push(ReportRecord::Action(ActionRecord {
574 case_id: case_id.to_owned(),
575 action_ix,
576 action: "run".to_owned(),
577 status: ActionStatus::Ok,
578 args: json!({"test_name": test_name, "timeout_ms": timeout_ms}),
579 ok: Some(proc_ok_payload(&run)),
580 fail: None,
581 }));
582 if options.emit_telemetry {
583 append_action_telemetry_records(&mut state.records, case_id, action_ix, &telemetry);
584 }
585 Ok(RuntimeValue::ProcRun(run))
586 }
587 Err(fail) => {
588 state.records.push(ReportRecord::Action(ActionRecord {
589 case_id: case_id.to_owned(),
590 action_ix,
591 action: "run".to_owned(),
592 status: ActionStatus::Fail,
593 args: json!({"test_name": test_name, "timeout_ms": timeout_ms}),
594 ok: None,
595 fail: Some(ActionFail {
596 kind: fail.kind.clone(),
597 msg: fail.msg.clone(),
598 code: fail.code.clone(),
599 }),
600 }));
601 Err(fail)
602 }
603 }
604 }
605 ResultExpr::Proc {
606 path,
607 args,
608 timeout_ms,
609 } => {
610 let path = as_string(eval_value(path, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
611 .map_err(to_runtime_fail)?;
612 let resolved_args = evaluate_proc_args(args, &state.bindings, &state.artifacts)?;
613 let action_ix = state.action_ix;
614 state.action_ix += 1;
615 match execute_process(&path, &resolved_args, *timeout_ms) {
616 Ok(run) => {
617 let telemetry = run.telemetry.clone();
618 state.records.push(ReportRecord::Action(ActionRecord {
619 case_id: case_id.to_owned(),
620 action_ix,
621 action: "proc".to_owned(),
622 status: ActionStatus::Ok,
623 args: json!({"path": path, "args": resolved_args, "timeout_ms": timeout_ms}),
624 ok: Some(proc_ok_payload(&run)),
625 fail: None,
626 }));
627 if options.emit_telemetry {
628 append_action_telemetry_records(&mut state.records, case_id, action_ix, &telemetry);
629 }
630 Ok(RuntimeValue::ProcRun(run))
631 }
632 Err(fail) => {
633 state.records.push(ReportRecord::Action(ActionRecord {
634 case_id: case_id.to_owned(),
635 action_ix,
636 action: "proc".to_owned(),
637 status: ActionStatus::Fail,
638 args: json!({"path": path, "args": resolved_args, "timeout_ms": timeout_ms}),
639 ok: None,
640 fail: Some(ActionFail {
641 kind: fail.kind.clone(),
642 msg: fail.msg.clone(),
643 code: fail.code.clone(),
644 }),
645 }));
646 Err(fail)
647 }
648 }
649 }
650 ResultExpr::HttpGet { url, timeout_ms } => {
651 let url = as_string(eval_value(url, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
652 .map_err(to_runtime_fail)?;
653 let action_ix = state.action_ix;
654 state.action_ix += 1;
655 match execute_http_get(&url, *timeout_ms) {
656 Ok(response) => {
657 state.records.push(ReportRecord::Action(ActionRecord {
658 case_id: case_id.to_owned(),
659 action_ix,
660 action: "http_get".to_owned(),
661 status: ActionStatus::Ok,
662 args: json!({"url": url, "timeout_ms": timeout_ms}),
663 ok: Some(http_ok_payload(&response)),
664 fail: None,
665 }));
666 Ok(RuntimeValue::HttpResp(response))
667 }
668 Err(fail) => {
669 state.records.push(ReportRecord::Action(ActionRecord {
670 case_id: case_id.to_owned(),
671 action_ix,
672 action: "http_get".to_owned(),
673 status: ActionStatus::Fail,
674 args: json!({"url": url, "timeout_ms": timeout_ms}),
675 ok: None,
676 fail: Some(ActionFail {
677 kind: fail.kind.clone(),
678 msg: fail.msg.clone(),
679 code: fail.code.clone(),
680 }),
681 }));
682 Err(fail)
683 }
684 }
685 }
686 ResultExpr::Tcp {
687 address,
688 send,
689 recv_max,
690 timeout_ms,
691 } => {
692 let address = as_string(eval_value(address, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
693 .map_err(to_runtime_fail)?;
694 let send = as_bytes(eval_value(send, &state.bindings, &state.artifacts).map_err(to_runtime_fail)?)
695 .map_err(to_runtime_fail)?;
696 let action_ix = state.action_ix;
697 state.action_ix += 1;
698 match execute_tcp(&address, &send, *recv_max as usize, *timeout_ms) {
699 Ok(response) => {
700 state.records.push(ReportRecord::Action(ActionRecord {
701 case_id: case_id.to_owned(),
702 action_ix,
703 action: "tcp".to_owned(),
704 status: ActionStatus::Ok,
705 args: json!({
706 "address": address,
707 "send_len": send.len(),
708 "recv_max": recv_max,
709 "timeout_ms": timeout_ms
710 }),
711 ok: Some(tcp_ok_payload(&response)),
712 fail: None,
713 }));
714 Ok(RuntimeValue::TcpResp(response))
715 }
716 Err(fail) => {
717 state.records.push(ReportRecord::Action(ActionRecord {
718 case_id: case_id.to_owned(),
719 action_ix,
720 action: "tcp".to_owned(),
721 status: ActionStatus::Fail,
722 args: json!({
723 "address": address,
724 "send_len": send.len(),
725 "recv_max": recv_max,
726 "timeout_ms": timeout_ms
727 }),
728 ok: None,
729 fail: Some(ActionFail {
730 kind: fail.kind.clone(),
731 msg: fail.msg.clone(),
732 code: fail.code.clone(),
733 }),
734 }));
735 Err(fail)
736 }
737 }
738 }
739 ResultExpr::ArtifactCheck { name, artifact_kind } => {
740 let action_ix = state.action_ix;
741 state.action_ix += 1;
742 match check_artifact(name, artifact_kind, &state.artifacts) {
743 Ok(binding) => {
744 state.records.push(ReportRecord::Action(ActionRecord {
745 case_id: case_id.to_owned(),
746 action_ix,
747 action: "artifact_check".to_owned(),
748 status: ActionStatus::Ok,
749 args: json!({"name": name, "kind": artifact_kind}),
750 ok: Some(json!({"name": binding.name, "kind": binding.kind, "path": binding.path})),
751 fail: None,
752 }));
753 state.records.push(ReportRecord::Artifact(ArtifactRecord {
754 case_id: case_id.to_owned(),
755 artifact_ix: state.artifact_ix,
756 action_ix,
757 name: name.clone(),
758 event: "check".to_owned(),
759 kind: artifact_kind.clone(),
760 status: ActionStatus::Ok,
761 location: Some(json!({"path": binding.path})),
762 fail: None,
763 }));
764 state.artifact_ix += 1;
765 Ok(artifact_binding_value(&binding))
766 }
767 Err(fail) => {
768 state.records.push(ReportRecord::Action(ActionRecord {
769 case_id: case_id.to_owned(),
770 action_ix,
771 action: "artifact_check".to_owned(),
772 status: ActionStatus::Fail,
773 args: json!({"name": name, "kind": artifact_kind}),
774 ok: None,
775 fail: Some(ActionFail {
776 kind: fail.kind.clone(),
777 msg: fail.msg.clone(),
778 code: fail.code.clone(),
779 }),
780 }));
781 state.records.push(ReportRecord::Artifact(ArtifactRecord {
782 case_id: case_id.to_owned(),
783 artifact_ix: state.artifact_ix,
784 action_ix,
785 name: name.clone(),
786 event: "check".to_owned(),
787 kind: artifact_kind.clone(),
788 status: ActionStatus::Fail,
789 location: None,
790 fail: Some(ActionFail {
791 kind: fail.kind.clone(),
792 msg: fail.msg.clone(),
793 code: fail.code.clone(),
794 }),
795 }));
796 state.artifact_ix += 1;
797 Err(fail)
798 }
799 }
800 }
801 ResultExpr::ExtractJson { name, select } => {
802 execute_extract(name, select, false, case_id, state)
803 }
804 ResultExpr::ExtractJsonl { name, select } => {
805 execute_extract(name, select, true, case_id, state)
806 }
807 }
808}
809
810fn evaluate_proc_args(
811 args: &[ValueExpr],
812 bindings: &BTreeMap<String, RuntimeValue>,
813 artifacts: &BTreeMap<String, ArtifactBinding>,
814) -> Result<Vec<String>, RuntimeFail> {
815 let mut resolved = Vec::with_capacity(args.len());
816 for arg in args {
817 let value = eval_value(arg, bindings, artifacts).map_err(to_runtime_fail)?;
818 resolved.push(as_string(value).map_err(to_runtime_fail)?);
819 }
820 Ok(resolved)
821}
822
823fn validate_artifact_path(path: &str, artifact_kind: &str) -> Result<(), RuntimeFail> {
824 let metadata = fs::metadata(path).map_err(|error| RuntimeFail {
825 kind: "artifact-missing".to_owned(),
826 msg: format!("artifact `{path}` is missing: {error}"),
827 code: None,
828 })?;
829 match artifact_kind {
830 "directory" => {
831 if metadata.is_dir() {
832 Ok(())
833 } else {
834 Err(RuntimeFail {
835 kind: "artifact-kind".to_owned(),
836 msg: format!("artifact `{path}` is not a directory"),
837 code: None,
838 })
839 }
840 }
841 _ => {
842 if metadata.is_file() {
843 Ok(())
844 } else {
845 Err(RuntimeFail {
846 kind: "artifact-kind".to_owned(),
847 msg: format!("artifact `{path}` is not a file"),
848 code: None,
849 })
850 }
851 }
852 }
853}
854
855fn check_artifact(
856 name: &str,
857 artifact_kind: &str,
858 artifacts: &BTreeMap<String, ArtifactBinding>,
859) -> Result<ArtifactBinding, RuntimeFail> {
860 let binding = artifacts.get(name).cloned().ok_or_else(|| RuntimeFail {
861 kind: "artifact-missing".to_owned(),
862 msg: format!("unknown artifact `{name}`"),
863 code: None,
864 })?;
865 if binding.kind != artifact_kind {
866 return Err(RuntimeFail {
867 kind: "artifact-kind".to_owned(),
868 msg: format!(
869 "artifact `{name}` has kind `{}`, expected `{artifact_kind}`",
870 binding.kind
871 ),
872 code: None,
873 });
874 }
875 validate_artifact_path(&binding.path, artifact_kind)?;
876 Ok(binding)
877}
878
879fn artifact_binding_value(binding: &ArtifactBinding) -> RuntimeValue {
880 RuntimeValue::Object(BTreeMap::from([
881 ("name".to_owned(), RuntimeValue::String(binding.name.clone())),
882 ("kind".to_owned(), RuntimeValue::String(binding.kind.clone())),
883 ("path".to_owned(), RuntimeValue::String(binding.path.clone())),
884 ]))
885}
886
887fn execute_extract(
888 name: &str,
889 select: &str,
890 jsonl: bool,
891 case_id: &str,
892 state: &mut CaseState,
893) -> Result<RuntimeValue, RuntimeFail> {
894 let binding = check_artifact(name, if jsonl { "jsonl" } else { "json" }, &state.artifacts)?;
895 let action_ix = state.action_ix;
896 state.action_ix += 1;
897 match extract_value(&binding.path, select, jsonl) {
898 Ok((match_count, value_text)) => {
899 let action_name = if jsonl { "extract_jsonl" } else { "extract_json" };
900 let format = if jsonl { "jsonl" } else { "json" };
901 let (preview_len, preview_b64, truncated) = preview_bytes(value_text.as_bytes());
902 let value_text_field = if preview_len == 0 { None } else { Some(value_text.clone()) };
903 state.records.push(ReportRecord::Action(ActionRecord {
904 case_id: case_id.to_owned(),
905 action_ix,
906 action: action_name.to_owned(),
907 status: ActionStatus::Ok,
908 args: json!({"name": name, "select": select}),
909 ok: Some(json!({
910 "source_artifact": name,
911 "select": select,
912 "match_count": match_count,
913 "value_text": value_text_field,
914 "value_text_truncated": truncated
915 })),
916 fail: None,
917 }));
918 state.records.push(ReportRecord::Extract(ExtractRecord {
919 case_id: case_id.to_owned(),
920 extract_ix: state.extract_ix,
921 action_ix,
922 source_artifact: name.to_owned(),
923 format: format.to_owned(),
924 status: ActionStatus::Ok,
925 select: Some(select.to_owned()),
926 summary: Some(json!({
927 "match_count": match_count,
928 "value_text": preview_b64.map(|_| value_text),
929 "value_text_truncated": truncated
930 })),
931 fail: None,
932 }));
933 state.extract_ix += 1;
934 Ok(RuntimeValue::Object(BTreeMap::from([
935 ("format".to_owned(), RuntimeValue::String(format.to_owned())),
936 ("select".to_owned(), RuntimeValue::String(select.to_owned())),
937 ("count".to_owned(), RuntimeValue::Int(match_count as i64)),
938 ("text".to_owned(), RuntimeValue::String(value_text_field.unwrap_or_default())),
939 ])))
940 }
941 Err(fail) => {
942 let action_name = if jsonl { "extract_jsonl" } else { "extract_json" };
943 let format = if jsonl { "jsonl" } else { "json" };
944 state.records.push(ReportRecord::Action(ActionRecord {
945 case_id: case_id.to_owned(),
946 action_ix,
947 action: action_name.to_owned(),
948 status: ActionStatus::Fail,
949 args: json!({"name": name, "select": select}),
950 ok: None,
951 fail: Some(ActionFail {
952 kind: fail.kind.clone(),
953 msg: fail.msg.clone(),
954 code: fail.code.clone(),
955 }),
956 }));
957 state.records.push(ReportRecord::Extract(ExtractRecord {
958 case_id: case_id.to_owned(),
959 extract_ix: state.extract_ix,
960 action_ix,
961 source_artifact: name.to_owned(),
962 format: format.to_owned(),
963 status: ActionStatus::Fail,
964 select: Some(select.to_owned()),
965 summary: None,
966 fail: Some(ActionFail {
967 kind: fail.kind.clone(),
968 msg: fail.msg.clone(),
969 code: fail.code.clone(),
970 }),
971 }));
972 state.extract_ix += 1;
973 Err(fail)
974 }
975 }
976}
977
978fn extract_value(path: &str, select: &str, jsonl: bool) -> Result<(usize, String), RuntimeFail> {
979 let source = fs::read_to_string(path).map_err(|error| RuntimeFail {
980 kind: "io".to_owned(),
981 msg: format!("failed to read `{path}`: {error}"),
982 code: None,
983 })?;
984 let mut matches = Vec::new();
985 if jsonl {
986 for line in source.lines() {
987 if line.trim().is_empty() {
988 continue;
989 }
990 let value = serde_json::from_str::<serde_json::Value>(line).map_err(|error| RuntimeFail {
991 kind: "protocol".to_owned(),
992 msg: format!("invalid jsonl in `{path}`: {error}"),
993 code: None,
994 })?;
995 if let Some(found) = select_json_value(&value, select) {
996 matches.push(found);
997 }
998 }
999 } else {
1000 let value = serde_json::from_str::<serde_json::Value>(&source).map_err(|error| RuntimeFail {
1001 kind: "protocol".to_owned(),
1002 msg: format!("invalid json in `{path}`: {error}"),
1003 code: None,
1004 })?;
1005 if let Some(found) = select_json_value(&value, select) {
1006 matches.push(found);
1007 }
1008 }
1009 let first = matches.first().cloned().unwrap_or_else(|| "".to_owned());
1010 Ok((matches.len(), first))
1011}
1012
1013fn select_json_value(value: &serde_json::Value, select: &str) -> Option<String> {
1014 let path = select.strip_prefix("$.")?;
1015 let mut current = value;
1016 for segment in path.split('.') {
1017 current = current.get(segment)?;
1018 }
1019 Some(match current {
1020 serde_json::Value::String(text) => text.clone(),
1021 other => other.to_string(),
1022 })
1023}
1024
1025fn normalize_joined_path(parts: &[String]) -> String {
1026 let mut path = PathBuf::new();
1027 for part in parts {
1028 path.push(part);
1029 }
1030 normalize_relative_path(&path)
1031}
1032
1033fn execute_inventory_entry(
1034 entry: &InventoryEntry,
1035 providers: Option<&ResolvedProviders>,
1036 timeout_ms: u32,
1037) -> Result<ProcRun, RuntimeFail> {
1038 match &entry.runner {
1039 InventoryRunner::Exec { path, args } => execute_process(path, args, timeout_ms),
1040 InventoryRunner::Sh { command } => execute_process("/bin/sh", &["-c".to_owned(), command.clone()], timeout_ms),
1041 InventoryRunner::Provider { provider, target } => {
1042 let providers = providers.ok_or_else(|| RuntimeFail {
1043 kind: "provider-config".to_owned(),
1044 msg: "provider config required".to_owned(),
1045 code: None,
1046 })?;
1047 let resolved = providers.get(provider).ok_or_else(|| RuntimeFail {
1048 kind: "provider-config".to_owned(),
1049 msg: format!("unresolved provider `{provider}`"),
1050 code: None,
1051 })?;
1052 let response = run_provider_target(resolved, target, timeout_ms).map_err(to_runtime_fail)?;
1053 let out = decode_b64_bytes(&response.out_b64).map_err(to_runtime_fail)?;
1054 let err = decode_b64_bytes(&response.err_b64).map_err(to_runtime_fail)?;
1055 Ok(ProcRun {
1056 exit: response.exit,
1057 out,
1058 err,
1059 telemetry: response.telemetry,
1060 })
1061 }
1062 }
1063}
1064
1065fn execute_process(path: &str, args: &[String], timeout_ms: u32) -> Result<ProcRun, RuntimeFail> {
1066 let started_at = Instant::now();
1067 let usage_before = child_resource_usage();
1068 let mut child = Command::new(path)
1069 .args(args)
1070 .stdout(Stdio::piped())
1071 .stderr(Stdio::piped())
1072 .spawn()
1073 .map_err(|error| RuntimeFail {
1074 kind: "spawn".to_owned(),
1075 msg: error.to_string(),
1076 code: None,
1077 })?;
1078
1079 let deadline = Instant::now() + Duration::from_millis(u64::from(timeout_ms));
1080 loop {
1081 match child.try_wait() {
1082 Ok(Some(_)) => {
1083 let output = child.wait_with_output().map_err(|error| RuntimeFail {
1084 kind: "io".to_owned(),
1085 msg: error.to_string(),
1086 code: None,
1087 })?;
1088 let usage_after = child_resource_usage();
1089 return Ok(ProcRun {
1090 exit: output.status.code().unwrap_or(-1),
1091 out: output.stdout,
1092 err: output.stderr,
1093 telemetry: runner_telemetry(started_at, usage_before, usage_after),
1094 });
1095 }
1096 Ok(None) => {
1097 if Instant::now() >= deadline {
1098 let _ = child.kill();
1099 let _ = child.wait();
1100 return Err(RuntimeFail {
1101 kind: "timeout".to_owned(),
1102 msg: "timeout".to_owned(),
1103 code: None,
1104 });
1105 }
1106 thread::sleep(Duration::from_millis(10));
1107 }
1108 Err(error) => {
1109 return Err(RuntimeFail {
1110 kind: "io".to_owned(),
1111 msg: error.to_string(),
1112 code: None,
1113 })
1114 }
1115 }
1116 }
1117}
1118
1119fn runner_telemetry(
1120 started_at: Instant,
1121 usage_before: Option<ChildResourceUsage>,
1122 usage_after: Option<ChildResourceUsage>,
1123) -> Vec<TelemetryEntry> {
1124 let mut telemetry = vec![TelemetryEntry {
1125 name: "wall_time_ns".to_owned(),
1126 unit: Some("ns".to_owned()),
1127 value: crate::TelemetryValue::Metric {
1128 value: telemetry_number_from_u128(started_at.elapsed().as_nanos()),
1129 },
1130 }];
1131
1132 if let (Some(before), Some(after)) = (usage_before, usage_after) {
1133 telemetry.push(TelemetryEntry {
1134 name: "cpu_user_ns".to_owned(),
1135 unit: Some("ns".to_owned()),
1136 value: crate::TelemetryValue::Metric {
1137 value: Number::from(after.user_ns.saturating_sub(before.user_ns)),
1138 },
1139 });
1140 telemetry.push(TelemetryEntry {
1141 name: "cpu_system_ns".to_owned(),
1142 unit: Some("ns".to_owned()),
1143 value: crate::TelemetryValue::Metric {
1144 value: Number::from(after.system_ns.saturating_sub(before.system_ns)),
1145 },
1146 });
1147 telemetry.push(TelemetryEntry {
1148 name: "peak_rss_bytes".to_owned(),
1149 unit: Some("bytes".to_owned()),
1150 value: crate::TelemetryValue::Metric {
1151 value: Number::from(after.peak_rss_bytes),
1152 },
1153 });
1154 }
1155
1156 telemetry
1157}
1158
1159fn telemetry_number_from_u128(value: u128) -> Number {
1160 let value = value.min(u128::from(u64::MAX));
1161 Number::from(value as u64)
1162}
1163
1164#[derive(Debug, Clone, Copy)]
1165struct ChildResourceUsage {
1166 user_ns: u64,
1167 system_ns: u64,
1168 peak_rss_bytes: u64,
1169}
1170
1171#[cfg(unix)]
1172fn child_resource_usage() -> Option<ChildResourceUsage> {
1173 let mut usage = std::mem::MaybeUninit::<libc::rusage>::zeroed();
1174 let status = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, usage.as_mut_ptr()) };
1175 if status != 0 {
1176 return None;
1177 }
1178
1179 let usage = unsafe { usage.assume_init() };
1180 Some(ChildResourceUsage {
1181 user_ns: timeval_to_ns(usage.ru_utime),
1182 system_ns: timeval_to_ns(usage.ru_stime),
1183 peak_rss_bytes: peak_rss_bytes(usage.ru_maxrss),
1184 })
1185}
1186
1187#[cfg(not(unix))]
1188fn child_resource_usage() -> Option<ChildResourceUsage> {
1189 None
1190}
1191
1192#[cfg(unix)]
1193fn timeval_to_ns(value: libc::timeval) -> u64 {
1194 let secs = u64::try_from(value.tv_sec).unwrap_or(0);
1195 let micros = u64::try_from(value.tv_usec).unwrap_or(0);
1196 secs.saturating_mul(1_000_000_000)
1197 .saturating_add(micros.saturating_mul(1_000))
1198}
1199
1200#[cfg(unix)]
1201fn peak_rss_bytes(value: libc::c_long) -> u64 {
1202 let value = u64::try_from(value).unwrap_or(0);
1203 #[cfg(target_os = "linux")]
1204 {
1205 value.saturating_mul(1024)
1206 }
1207 #[cfg(not(target_os = "linux"))]
1208 {
1209 value
1210 }
1211}
1212
1213fn append_action_telemetry_records(
1214 records: &mut Vec<ReportRecord>,
1215 case_id: &str,
1216 action_ix: usize,
1217 telemetry: &[TelemetryEntry],
1218) {
1219 for entry in telemetry {
1220 records.push(ReportRecord::Telemetry(TelemetryRecord::action(
1221 case_id.to_owned(),
1222 action_ix,
1223 entry.clone(),
1224 )));
1225 }
1226}
1227
1228fn evaluate_assert(predicate: &Predicate, state: &CaseState) -> ObserverResult<(Status, String)> {
1229 match predicate {
1230 Predicate::Fail { msg } => Ok((Status::Fail, msg.clone())),
1231 _ => {
1232 let pass = evaluate_predicate(predicate, state)?;
1233 if pass {
1234 Ok((Status::Pass, "expectation passed".to_owned()))
1235 } else {
1236 Ok((Status::Fail, "expectation failed".to_owned()))
1237 }
1238 }
1239 }
1240}
1241
1242fn evaluate_predicate(predicate: &Predicate, state: &CaseState) -> ObserverResult<bool> {
1243 match predicate {
1244 Predicate::Compare { op, left, right } => {
1245 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1246 let right = eval_value(right, &state.bindings, &state.artifacts)?;
1247 compare_values(*op, &left, &right)
1248 }
1249 Predicate::IsStatus { left, status } => {
1250 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1251 is_status_value(&left, *status)
1252 }
1253 Predicate::IsStatusClass { left, class } => {
1254 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1255 is_status_class_value(&left, *class)
1256 }
1257 Predicate::HasHeader { left, name } => {
1258 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1259 has_header_value(&left, name)
1260 }
1261 Predicate::Contains { left, right } => {
1262 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1263 let right = eval_value(right, &state.bindings, &state.artifacts)?;
1264 contains_value(&left, &right)
1265 }
1266 Predicate::ContainsRegex { left, regex } | Predicate::Match { left, regex } => {
1267 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1268 match left {
1269 RuntimeValue::String(value) => Ok(Regex::new(regex)
1270 .map_err(|error| ObserverError::Runtime(error.to_string()))?
1271 .is_match(&value)),
1272 RuntimeValue::Bytes(bytes) => match String::from_utf8(bytes) {
1273 Ok(value) => Ok(Regex::new(regex)
1274 .map_err(|error| ObserverError::Runtime(error.to_string()))?
1275 .is_match(&value)),
1276 Err(_) => Ok(false),
1277 },
1278 _ => Err(ObserverError::Runtime(
1279 "regex predicates require string or bytes operands".to_owned(),
1280 )),
1281 }
1282 }
1283 Predicate::StartsWith { left, right } => {
1284 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1285 let right = eval_value(right, &state.bindings, &state.artifacts)?;
1286 starts_with_value(&left, &right)
1287 }
1288 Predicate::EndsWith { left, right } => {
1289 let left = eval_value(left, &state.bindings, &state.artifacts)?;
1290 let right = eval_value(right, &state.bindings, &state.artifacts)?;
1291 ends_with_value(&left, &right)
1292 }
1293 Predicate::Fail { .. } => Ok(false),
1294 }
1295}
1296
1297fn compare_values(op: CompareOp, left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1298 match (left, right) {
1299 (RuntimeValue::Int(left), RuntimeValue::Int(right)) => Ok(match op {
1300 CompareOp::Eq => left == right,
1301 CompareOp::Ne => left != right,
1302 CompareOp::Lt => left < right,
1303 CompareOp::Le => left <= right,
1304 CompareOp::Gt => left > right,
1305 CompareOp::Ge => left >= right,
1306 }),
1307 (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(match op {
1308 CompareOp::Eq => left == right,
1309 CompareOp::Ne => left != right,
1310 _ => false,
1311 }),
1312 (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(match op {
1313 CompareOp::Eq => left == right,
1314 CompareOp::Ne => left != right,
1315 _ => false,
1316 }),
1317 (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => compare_values(
1318 op,
1319 &RuntimeValue::Bytes(left.clone()),
1320 &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1321 ),
1322 (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => compare_values(
1323 op,
1324 &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1325 &RuntimeValue::Bytes(right.clone()),
1326 ),
1327 _ => Err(ObserverError::Runtime(
1328 "unsupported operand types for comparison".to_owned(),
1329 )),
1330 }
1331}
1332
1333fn contains_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1334 match (left, right) {
1335 (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.contains(right)),
1336 (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => {
1337 Ok(left.windows(right.len()).any(|window| window == right.as_slice()))
1338 }
1339 (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => contains_value(
1340 &RuntimeValue::Bytes(left.clone()),
1341 &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1342 ),
1343 (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => contains_value(
1344 &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1345 &RuntimeValue::Bytes(right.clone()),
1346 ),
1347 _ => Err(ObserverError::Runtime(
1348 "unsupported operand types for contains".to_owned(),
1349 )),
1350 }
1351}
1352
1353fn starts_with_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1354 match (left, right) {
1355 (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.starts_with(right)),
1356 (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(left.starts_with(right)),
1357 (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => starts_with_value(
1358 &RuntimeValue::Bytes(left.clone()),
1359 &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1360 ),
1361 (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => starts_with_value(
1362 &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1363 &RuntimeValue::Bytes(right.clone()),
1364 ),
1365 _ => Err(ObserverError::Runtime(
1366 "unsupported operand types for startsWith".to_owned(),
1367 )),
1368 }
1369}
1370
1371fn ends_with_value(left: &RuntimeValue, right: &RuntimeValue) -> ObserverResult<bool> {
1372 match (left, right) {
1373 (RuntimeValue::String(left), RuntimeValue::String(right)) => Ok(left.ends_with(right)),
1374 (RuntimeValue::Bytes(left), RuntimeValue::Bytes(right)) => Ok(left.ends_with(right)),
1375 (RuntimeValue::Bytes(left), RuntimeValue::String(right)) => ends_with_value(
1376 &RuntimeValue::Bytes(left.clone()),
1377 &RuntimeValue::Bytes(right.as_bytes().to_vec()),
1378 ),
1379 (RuntimeValue::String(left), RuntimeValue::Bytes(right)) => ends_with_value(
1380 &RuntimeValue::Bytes(left.as_bytes().to_vec()),
1381 &RuntimeValue::Bytes(right.clone()),
1382 ),
1383 _ => Err(ObserverError::Runtime(
1384 "unsupported operand types for endsWith".to_owned(),
1385 )),
1386 }
1387}
1388
1389fn has_header_value(left: &RuntimeValue, name: &str) -> ObserverResult<bool> {
1390 let header_name = name.to_ascii_lowercase();
1391 match left {
1392 RuntimeValue::HttpResp(response) => Ok(response.headers.contains_key(&header_name)),
1393 _ => Err(ObserverError::Runtime(
1394 "hasHeader predicates require an http response operand".to_owned(),
1395 )),
1396 }
1397}
1398
1399fn is_status_value(left: &RuntimeValue, status: i64) -> ObserverResult<bool> {
1400 match left {
1401 RuntimeValue::HttpResp(response) => Ok(response.status == status),
1402 _ => Err(ObserverError::Runtime(
1403 "isStatus predicates require an http response operand".to_owned(),
1404 )),
1405 }
1406}
1407
1408fn is_status_class_value(left: &RuntimeValue, class: i64) -> ObserverResult<bool> {
1409 if !(1..=5).contains(&class) {
1410 return Err(ObserverError::Runtime(
1411 "isStatusClass requires a status class in the range 1..=5".to_owned(),
1412 ));
1413 }
1414 match left {
1415 RuntimeValue::HttpResp(response) => Ok(response.status / 100 == class),
1416 _ => Err(ObserverError::Runtime(
1417 "isStatusClass predicates require an http response operand".to_owned(),
1418 )),
1419 }
1420}
1421
1422fn eval_value(
1423 expr: &ValueExpr,
1424 bindings: &BTreeMap<String, RuntimeValue>,
1425 artifacts: &BTreeMap<String, ArtifactBinding>,
1426) -> ObserverResult<RuntimeValue> {
1427 match expr {
1428 ValueExpr::Binding { name } => bindings
1429 .get(name)
1430 .cloned()
1431 .ok_or_else(|| ObserverError::Runtime(format!("unbound identifier `{name}`"))),
1432 ValueExpr::String { value } => Ok(RuntimeValue::String(value.clone())),
1433 ValueExpr::BytesUtf8 { value } => Ok(RuntimeValue::Bytes(value.as_bytes().to_vec())),
1434 ValueExpr::Int { value } => Ok(RuntimeValue::Int(*value)),
1435 ValueExpr::Field { base, name } => {
1436 let value = eval_value(base, bindings, artifacts)?;
1437 match (value, name.as_str()) {
1438 (RuntimeValue::ProcRun(run), "exit") => Ok(RuntimeValue::Int(run.exit.into())),
1439 (RuntimeValue::ProcRun(run), "out") => Ok(RuntimeValue::Bytes(run.out)),
1440 (RuntimeValue::ProcRun(run), "err") => Ok(RuntimeValue::Bytes(run.err)),
1441 (RuntimeValue::HttpResp(response), "status") => Ok(RuntimeValue::Int(response.status)),
1442 (RuntimeValue::HttpResp(response), "body") => Ok(RuntimeValue::Bytes(response.body)),
1443 (RuntimeValue::TcpResp(response), "bytes") => Ok(RuntimeValue::Bytes(response.bytes)),
1444 (RuntimeValue::Object(fields), field_name) => fields.get(field_name).cloned().ok_or_else(|| {
1445 ObserverError::Runtime(format!("unknown field `{field_name}`"))
1446 }),
1447 (RuntimeValue::Fail(fail), "kind") => Ok(RuntimeValue::String(fail.kind)),
1448 (RuntimeValue::Fail(fail), "msg") => Ok(RuntimeValue::String(fail.msg)),
1449 (RuntimeValue::Fail(fail), "code") => {
1450 if let Some(code) = fail.code {
1451 Ok(RuntimeValue::String(code))
1452 } else {
1453 Err(ObserverError::Runtime(
1454 "optional fail field `code` is absent".to_owned(),
1455 ))
1456 }
1457 }
1458 _ => Err(ObserverError::Runtime(format!("unknown field `{name}`"))),
1459 }
1460 }
1461 ValueExpr::Header { base, name } => {
1462 let header_name = name.to_ascii_lowercase();
1463 let value = eval_value(base, bindings, artifacts)?;
1464 match value {
1465 RuntimeValue::HttpResp(response) => response
1466 .headers
1467 .get(&header_name)
1468 .cloned()
1469 .map(|value| RuntimeValue::String(value))
1470 .ok_or_else(|| {
1471 ObserverError::Runtime(format!(
1472 "header `{header_name}` is absent"
1473 ))
1474 }),
1475 _ => Err(ObserverError::Runtime(
1476 "header lookup requires an http response operand".to_owned(),
1477 )),
1478 }
1479 }
1480 ValueExpr::ArtifactPath { name } => artifacts
1481 .get(name)
1482 .map(|binding| RuntimeValue::String(binding.path.clone()))
1483 .ok_or_else(|| ObserverError::Runtime(format!("unknown artifact `{name}`"))),
1484 ValueExpr::JoinPath { parts } => {
1485 let mut strings = Vec::with_capacity(parts.len());
1486 for part in parts {
1487 let value = eval_value(part, bindings, artifacts)?;
1488 strings.push(as_string(value)?);
1489 }
1490 Ok(RuntimeValue::String(normalize_joined_path(&strings)))
1491 }
1492 }
1493}
1494
1495fn as_string(value: RuntimeValue) -> ObserverResult<String> {
1496 match value {
1497 RuntimeValue::String(value) => Ok(value),
1498 _ => Err(ObserverError::Runtime("expected string value".to_owned())),
1499 }
1500}
1501
1502fn as_bytes(value: RuntimeValue) -> ObserverResult<Vec<u8>> {
1503 match value {
1504 RuntimeValue::Bytes(value) => Ok(value),
1505 RuntimeValue::String(value) => Ok(value.into_bytes()),
1506 _ => Err(ObserverError::Runtime("expected bytes or string value".to_owned())),
1507 }
1508}
1509
1510fn proc_ok_payload(run: &ProcRun) -> serde_json::Value {
1511 let (out_len, out_preview, out_truncated) = preview_bytes(&run.out);
1512 let (err_len, err_preview, err_truncated) = preview_bytes(&run.err);
1513 let mut payload = json!({
1514 "exit": run.exit,
1515 "out_len": out_len,
1516 "err_len": err_len,
1517 });
1518 if let Some(preview) = out_preview {
1519 payload["out_preview_b64"] = json!(preview);
1520 }
1521 if let Some(preview) = err_preview {
1522 payload["err_preview_b64"] = json!(preview);
1523 }
1524 if let Some(truncated) = out_truncated {
1525 payload["out_truncated"] = json!(truncated);
1526 }
1527 if let Some(truncated) = err_truncated {
1528 payload["err_truncated"] = json!(truncated);
1529 }
1530 payload
1531}
1532
1533fn http_ok_payload(response: &HttpResp) -> serde_json::Value {
1534 let (body_len, body_preview, body_truncated) = preview_bytes(&response.body);
1535 let mut payload = json!({
1536 "status": response.status,
1537 "body_len": body_len,
1538 "headers": response.headers,
1539 });
1540 if let Some(preview) = body_preview {
1541 payload["body_preview_b64"] = json!(preview);
1542 }
1543 if let Some(truncated) = body_truncated {
1544 payload["body_truncated"] = json!(truncated);
1545 }
1546 payload
1547}
1548
1549fn tcp_ok_payload(response: &TcpResp) -> serde_json::Value {
1550 let (bytes_len, bytes_preview, bytes_truncated) = preview_bytes(&response.bytes);
1551 let mut payload = json!({
1552 "bytes_len": bytes_len,
1553 });
1554 if let Some(preview) = bytes_preview {
1555 payload["bytes_preview_b64"] = json!(preview);
1556 }
1557 if let Some(truncated) = bytes_truncated {
1558 payload["bytes_truncated"] = json!(truncated);
1559 }
1560 payload
1561}
1562
1563fn execute_http_get(url: &str, timeout_ms: u32) -> Result<HttpResp, RuntimeFail> {
1564 let client = reqwest::blocking::Client::builder()
1565 .timeout(Duration::from_millis(u64::from(timeout_ms)))
1566 .build()
1567 .map_err(|error| RuntimeFail {
1568 kind: "http".to_owned(),
1569 msg: error.to_string(),
1570 code: None,
1571 })?;
1572
1573 let response = client.get(url).send().map_err(http_error_to_runtime_fail)?;
1574 let status = i64::from(response.status().as_u16());
1575 let mut headers = BTreeMap::new();
1576 for (name, value) in response.headers() {
1577 let header_value = value.to_str().map_err(|error| RuntimeFail {
1578 kind: "http".to_owned(),
1579 msg: error.to_string(),
1580 code: None,
1581 })?;
1582 headers.insert(name.as_str().to_ascii_lowercase(), header_value.to_owned());
1583 }
1584 let body = response.bytes().map_err(http_error_to_runtime_fail)?.to_vec();
1585
1586 Ok(HttpResp {
1587 status,
1588 body,
1589 headers,
1590 })
1591}
1592
1593fn execute_tcp(
1594 address: &str,
1595 send: &[u8],
1596 recv_max: usize,
1597 timeout_ms: u32,
1598) -> Result<TcpResp, RuntimeFail> {
1599 let timeout = Duration::from_millis(u64::from(timeout_ms));
1600 let addrs = resolve_socket_addrs(address)?;
1601 let mut last_error = None;
1602
1603 for socket_addr in addrs {
1604 match TcpStream::connect_timeout(&socket_addr, timeout) {
1605 Ok(mut stream) => {
1606 stream
1607 .set_read_timeout(Some(timeout))
1608 .map_err(io_runtime_fail)?;
1609 stream
1610 .set_write_timeout(Some(timeout))
1611 .map_err(io_runtime_fail)?;
1612 stream.write_all(send).map_err(io_runtime_fail)?;
1613
1614 let mut bytes = Vec::new();
1615 let mut buffer = vec![0u8; recv_max.min(4096).max(1)];
1616 while bytes.len() < recv_max {
1617 let remaining = recv_max - bytes.len();
1618 let read_len = remaining.min(buffer.len());
1619 match stream.read(&mut buffer[..read_len]) {
1620 Ok(0) => break,
1621 Ok(count) => bytes.extend_from_slice(&buffer[..count]),
1622 Err(error)
1623 if error.kind() == std::io::ErrorKind::WouldBlock
1624 || error.kind() == std::io::ErrorKind::TimedOut =>
1625 {
1626 if bytes.is_empty() {
1627 return Err(RuntimeFail {
1628 kind: "timeout".to_owned(),
1629 msg: "timeout".to_owned(),
1630 code: None,
1631 });
1632 }
1633 break;
1634 }
1635 Err(error) => return Err(io_runtime_fail(error)),
1636 }
1637 }
1638
1639 return Ok(TcpResp { bytes });
1640 }
1641 Err(error) => last_error = Some(error),
1642 }
1643 }
1644
1645 Err(last_error.map(io_runtime_fail).unwrap_or(RuntimeFail {
1646 kind: "tcp".to_owned(),
1647 msg: format!("no socket addresses resolved for `{address}`"),
1648 code: None,
1649 }))
1650}
1651
1652fn resolve_socket_addrs(address: &str) -> Result<Vec<SocketAddr>, RuntimeFail> {
1653 let mut addrs = address.to_socket_addrs().map_err(io_runtime_fail)?.collect::<Vec<_>>();
1654 addrs.sort_by(|left, right| left.to_string().as_bytes().cmp(right.to_string().as_bytes()));
1655 addrs.dedup();
1656 Ok(addrs)
1657}
1658
1659fn http_error_to_runtime_fail(error: reqwest::Error) -> RuntimeFail {
1660 let kind = if error.is_timeout() {
1661 "timeout"
1662 } else {
1663 "http"
1664 };
1665 RuntimeFail {
1666 kind: kind.to_owned(),
1667 msg: error.to_string(),
1668 code: error.status().map(|status| status.as_u16().to_string()),
1669 }
1670}
1671
1672fn io_runtime_fail(error: std::io::Error) -> RuntimeFail {
1673 RuntimeFail {
1674 kind: "io".to_owned(),
1675 msg: error.to_string(),
1676 code: None,
1677 }
1678}
1679
1680fn to_runtime_fail(error: ObserverError) -> RuntimeFail {
1681 match error {
1682 ObserverError::Runtime(msg) if msg == "timeout" => RuntimeFail {
1683 kind: "timeout".to_owned(),
1684 msg,
1685 code: None,
1686 },
1687 other => RuntimeFail {
1688 kind: "runtime".to_owned(),
1689 msg: other.to_string(),
1690 code: None,
1691 },
1692 }
1693}