use std::collections::BTreeMap;
use std::path::PathBuf;
use std::time::Duration;
use lifeloop::router::{
AdapterRegistry, AdapterResolution, CallbackInvoker, RoutingPlan, SubprocessCallbackInvoker,
SubprocessInvokerConfig, SubprocessInvokerError, route,
};
use lifeloop::{
AcceptablePlacement, AdapterManifest, AdapterRole, CallbackRequest, ConformanceLevel,
FailureClass, FrameContext, IntegrationMode, LifecycleEventKind, ManifestContextPressure,
ManifestPlacementClass, ManifestPlacementSupport, ManifestReceipts, PayloadEnvelope,
PayloadRef, PlacementClass, ReceiptStatus, RegisteredAdapter, RequirementLevel, SCHEMA_VERSION,
SupportState,
};
const FAKE_ID: &str = "ccd";
const FAKE_VERSION: &str = "0.1.0";
fn fake_ccd_bin() -> PathBuf {
PathBuf::from(env!("CARGO_BIN_EXE_lifeloop-fake-ccd-client"))
}
fn manifest() -> AdapterManifest {
let mut placement = BTreeMap::new();
placement.insert(
ManifestPlacementClass::PreFrameTrailing,
ManifestPlacementSupport {
support: SupportState::Native,
max_bytes: None,
},
);
AdapterManifest {
contract_version: SCHEMA_VERSION.to_string(),
adapter_id: FAKE_ID.into(),
adapter_version: FAKE_VERSION.into(),
display_name: "Fake CCD".into(),
role: AdapterRole::PrimaryWorker,
integration_modes: vec![IntegrationMode::NativeHook],
lifecycle_events: BTreeMap::new(),
placement,
context_pressure: ManifestContextPressure {
support: SupportState::Native,
evidence: None,
},
receipts: ManifestReceipts {
native: false,
lifeloop_synthesized: true,
receipt_ledger: SupportState::Unavailable,
},
session_identity: None,
session_rename: None,
renewal: None,
approval_surface: None,
failure_modes: Vec::new(),
telemetry_sources: Vec::new(),
known_degradations: Vec::new(),
}
}
struct Fixture(AdapterManifest);
impl AdapterRegistry for Fixture {
fn resolve(&self, id: &str, version: &str) -> AdapterResolution {
if id != self.0.adapter_id {
return AdapterResolution::UnknownId;
}
if version != self.0.adapter_version {
return AdapterResolution::VersionMismatch {
registered_version: self.0.adapter_version.clone(),
};
}
AdapterResolution::Found(RegisteredAdapter {
manifest: self.0.clone(),
conformance: ConformanceLevel::PreConformance,
})
}
}
fn frame_request() -> CallbackRequest {
CallbackRequest {
schema_version: SCHEMA_VERSION.to_string(),
event: LifecycleEventKind::FrameOpening,
event_id: "evt-e2e-1".into(),
adapter_id: FAKE_ID.into(),
adapter_version: FAKE_VERSION.into(),
integration_mode: IntegrationMode::NativeHook,
invocation_id: "inv-e2e-1".into(),
harness_session_id: Some("sess-e2e-1".into()),
harness_run_id: Some("run-e2e-1".into()),
harness_task_id: None,
frame_context: Some(FrameContext::top_level("frm-e2e-1")),
capability_snapshot_ref: None,
payload_refs: vec![PayloadRef {
payload_id: "pay-ref-1".into(),
payload_kind: "instruction_frame".into(),
content_digest: None,
byte_size: Some(11),
}],
sequence: None,
idempotency_key: Some("idem-e2e-1".into()),
metadata: serde_json::Map::new(),
}
}
fn build_plan(req: &CallbackRequest) -> RoutingPlan {
let fx = Fixture(manifest());
route(req, &fx).expect("plan builds")
}
fn invoker_with(behavior: &str, timeout: Duration) -> SubprocessCallbackInvoker {
let cfg = SubprocessInvokerConfig::new(fake_ccd_bin(), timeout).arg(behavior);
SubprocessCallbackInvoker::new(cfg)
}
#[test]
fn frame_opening_returns_delivered_with_payload() {
let invoker = invoker_with("ok", Duration::from_secs(5));
let req = frame_request();
let plan = build_plan(&req);
let resp = invoker.invoke(&plan, &[]).expect("subprocess returns ok");
assert_eq!(resp.status, ReceiptStatus::Delivered); assert_eq!(resp.client_payloads.len(), 1); let p = &resp.client_payloads[0];
assert_eq!(p.client_id, "ccd"); assert_eq!(p.payload_kind, "ccd.instruction_frame"); assert_eq!(p.idempotency_key.as_deref(), Some("idem-e2e-1")); }
#[test]
fn frame_opening_delivers_payload_envelope_to_subprocess() {
let invoker = invoker_with("ok", Duration::from_secs(5));
let req = frame_request();
let plan = build_plan(&req);
let body = "round-trip-body-from-e2e";
let payload = PayloadEnvelope {
schema_version: SCHEMA_VERSION.to_string(),
payload_id: "pay-e2e-payload-1".into(),
client_id: "ccd".into(),
payload_kind: "ccd.test_frame".into(),
format: "client-defined".into(),
content_encoding: "utf8".into(),
body: Some(body.into()),
body_ref: None,
byte_size: body.len() as u64,
content_digest: None,
acceptable_placements: vec![AcceptablePlacement {
placement: PlacementClass::PrePromptFrame,
requirement: RequirementLevel::Preferred,
}],
idempotency_key: None,
expires_at_epoch_s: None,
redaction: None,
metadata: serde_json::Map::new(),
};
let resp = invoker
.invoke(&plan, std::slice::from_ref(&payload))
.expect("subprocess returns ok");
assert_eq!(resp.status, ReceiptStatus::Delivered);
assert_eq!(resp.client_payloads.len(), 1);
let echoed = &resp.client_payloads[0];
assert_eq!(echoed.payload_kind, "ccd.test_frame");
assert_eq!(echoed.body.as_deref(), Some(body));
}
#[test]
fn session_started_returns_delivered_with_no_payload() {
let invoker = invoker_with("ok", Duration::from_secs(5));
let mut req = frame_request();
req.event = LifecycleEventKind::SessionStarted;
req.frame_context = None; let plan = build_plan(&req);
let resp = invoker.invoke(&plan, &[]).expect("subprocess returns ok");
assert_eq!(resp.status, ReceiptStatus::Delivered);
assert!(resp.client_payloads.is_empty()); }
#[test]
fn malformed_stdout_maps_to_invalid_request() {
let invoker = invoker_with("malformed", Duration::from_secs(5));
let plan = build_plan(&frame_request());
let err = invoker.invoke(&plan, &[]).unwrap_err();
assert!(
matches!(err, SubprocessInvokerError::ParseResponse(_)),
"expected ParseResponse, got {err:?}"
);
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::InvalidRequest);
}
#[test]
fn nonzero_exit_maps_to_transport_error() {
let invoker = invoker_with("nonzero", Duration::from_secs(5));
let plan = build_plan(&frame_request());
let err = invoker.invoke(&plan, &[]).unwrap_err();
match &err {
SubprocessInvokerError::NonZeroExit { code, stderr } => {
assert_eq!(*code, Some(7));
assert!(stderr.contains("simulated transport failure"));
}
other => panic!("expected NonZeroExit, got {other:?}"),
}
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::TransportError);
}
#[test]
fn nonzero_exit_dominates_broken_pipe_from_large_request() {
let invoker = invoker_with("nonzero", Duration::from_secs(5));
let plan = build_plan(&frame_request());
let body = "x".repeat(2 * 1024 * 1024);
let payload = PayloadEnvelope {
schema_version: SCHEMA_VERSION.to_string(),
payload_id: "pay-large-broken-pipe".into(),
client_id: "ccd".into(),
payload_kind: "ccd.large_request".into(),
format: "client-defined".into(),
content_encoding: "utf8".into(),
body: Some(body.clone()),
body_ref: None,
byte_size: body.len() as u64,
content_digest: None,
acceptable_placements: vec![AcceptablePlacement {
placement: PlacementClass::PrePromptFrame,
requirement: RequirementLevel::Preferred,
}],
idempotency_key: None,
expires_at_epoch_s: None,
redaction: None,
metadata: serde_json::Map::new(),
};
let err = invoker
.invoke(&plan, std::slice::from_ref(&payload))
.unwrap_err();
match &err {
SubprocessInvokerError::NonZeroExit { code, stderr } => {
assert_eq!(*code, Some(7));
assert!(stderr.contains("simulated transport failure"));
}
other => panic!("expected NonZeroExit to dominate BrokenPipe, got {other:?}"),
}
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::TransportError);
}
#[test]
fn zero_exit_broken_pipe_surfaces_write_request() {
let invoker = invoker_with("exit_zero_no_read", Duration::from_secs(5));
let plan = build_plan(&frame_request());
let body = "x".repeat(2 * 1024 * 1024);
let payload = PayloadEnvelope {
schema_version: SCHEMA_VERSION.to_string(),
payload_id: "pay-large-zero-broken-pipe".into(),
client_id: "ccd".into(),
payload_kind: "ccd.large_request".into(),
format: "client-defined".into(),
content_encoding: "utf8".into(),
body: Some(body.clone()),
body_ref: None,
byte_size: body.len() as u64,
content_digest: None,
acceptable_placements: vec![AcceptablePlacement {
placement: PlacementClass::PrePromptFrame,
requirement: RequirementLevel::Preferred,
}],
idempotency_key: None,
expires_at_epoch_s: None,
redaction: None,
metadata: serde_json::Map::new(),
};
let err = invoker
.invoke(&plan, std::slice::from_ref(&payload))
.unwrap_err();
match &err {
SubprocessInvokerError::WriteRequest(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {}
other => panic!("expected WriteRequest(BrokenPipe), got {other:?}"),
}
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::TransportError);
}
#[test]
fn hung_subprocess_is_killed_after_timeout() {
let invoker = invoker_with("hang", Duration::from_millis(150));
let plan = build_plan(&frame_request());
let err = invoker.invoke(&plan, &[]).unwrap_err();
assert!(
matches!(err, SubprocessInvokerError::Timeout),
"expected Timeout, got {err:?}"
);
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::Timeout);
}
#[test]
fn oversized_stdout_is_rejected_without_parsing() {
let invoker = invoker_with("huge_stdout", Duration::from_secs(5));
let plan = build_plan(&frame_request());
let err = invoker.invoke(&plan, &[]).unwrap_err();
assert!(
matches!(err, SubprocessInvokerError::ReadResponse(_)),
"expected ReadResponse for oversized stdout, got {err:?}"
);
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::TransportError);
}
#[cfg(unix)]
#[test]
fn inherited_stdout_pipe_times_out_and_is_cleaned_up() {
let invoker = invoker_with("hold_stdout_open", Duration::from_millis(150));
let plan = build_plan(&frame_request());
let err = invoker.invoke(&plan, &[]).unwrap_err();
assert!(
matches!(err, SubprocessInvokerError::Timeout),
"expected Timeout for inherited stdout pipe, got {err:?}"
);
}
#[test]
fn receipt_emitted_is_rejected_before_spawn() {
let cfg = SubprocessInvokerConfig::new(
PathBuf::from("/definitely/does/not/exist/lifeloop-fake-ccd-client-missing"),
Duration::from_secs(5),
);
let invoker = SubprocessCallbackInvoker::new(cfg);
let mut req = frame_request();
req.event = LifecycleEventKind::ReceiptEmitted;
req.frame_context = None;
req.idempotency_key = None; let plan = build_plan(&req);
let err = invoker.invoke(&plan, &[]).unwrap_err();
assert!(
matches!(err, SubprocessInvokerError::ReceiptEmittedRejected(_)),
"expected ReceiptEmittedRejected (pre-spawn guard), got {err:?}"
);
let fc: FailureClass = (&err).into();
assert_eq!(fc, FailureClass::InvalidRequest);
}