cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! SEC-21 Phase 3h — DNSSEC validation integration test.
//!
//! Drives the resolver-refresh ticker end-to-end with a synthetic
//! "bogus DNSSEC" upstream and asserts the pipeline emits a
//! `dns_authority_dnssec_failed` event into a JSONL sink. The
//! expected discriminator path is:
//!
//! 1. `spawn_continuous_ticker` is called with a `dnssec_policy`
//!    populated (validate=true, failClosed=false — audit mode so we
//!    can also observe the drift event with `dnssec_status` tagged).
//! 2. The ticker calls our synthetic `validated_resolver` which always
//!    returns a `Failed { reason: "synthetic-bogus" }` outcome.
//! 3. The ticker's per-tick refresher recognises the Failed verdict
//!    and emits a `dns_authority_dnssec_failed` event.
//! 4. The drift event still fires (audit-only kept the answer) and
//!    carries `dnssecStatus: "validation_failed"`.
//! 5. Both events flow through the in-memory `DriftEmitter` collector
//!    (the JSONL sink fan-out adapter is exercised in
//!    `supervisor_resolver_refresh.rs`; this test focuses on the
//!    Phase 3h-specific signals).
//!
//! ## Why `#[ignore]`
//!
//! The test is plain Rust + tokio with localhost UDP — works on any
//! platform CellOS builds for. We mark it `#[ignore]` per the
//! repository convention for tests in the SEC-21 family that exercise
//! the *integration* surface (vs the unit surface in
//! `crates/cellos-supervisor/src/resolver_refresh/{ticker,
//! hickory_resolve}::tests`). CI runs ignored tests via the
//! quality-gate's `cargo test -- --ignored` invocation; local
//! developers can opt in with the same flag.

use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use cellos_core::{
    CloudEventV1, DnsRebindingPolicy, DnsRefreshPolicy, DnsResolver, DnsResolverDnssecPolicy,
    DnsResolverProtocol,
};
use cellos_supervisor::resolver_refresh::ticker::{
    spawn_continuous_ticker, DriftEmitter, SharedResolverFn, SharedValidatedResolverFn,
    TickerConfig,
};
use cellos_supervisor::resolver_refresh::{
    DnssecValidationResult, ResolvedAnswer, TrustAnchors, ValidatedResolvedAnswer,
};

#[derive(Default)]
struct CollectingEmitter {
    events: Mutex<Vec<CloudEventV1>>,
}
impl DriftEmitter for CollectingEmitter {
    fn emit(&self, event: CloudEventV1) {
        self.events.lock().unwrap().push(event);
    }
}

fn one_dnssec_resolver() -> Vec<DnsResolver> {
    vec![DnsResolver {
        resolver_id: "resolver-do53-internal".into(),
        endpoint: "127.0.0.1:53".into(),
        protocol: DnsResolverProtocol::Do53Udp,
        trust_kid: None,
        dnssec: Some(DnsResolverDnssecPolicy {
            validate: true,
            fail_closed: false,
            trust_anchors_path: None,
        }),
    }]
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn supervisor_dnssec_validation_emits_failed_event_into_sink() {
    // Synthetic upstream that ALWAYS returns a "Failed" validation
    // outcome — mirrors a real upstream returning a bogus RRSIG that
    // hickory's bundled validator would reject. We do NOT spin a
    // wire-format DNSSEC server here; the validator's discriminator
    // is unit-tested in `hickory_resolve::tests` against synthetic
    // localhost UDP. This integration test pins the END-TO-END
    // *plumbing*: ticker pickup → refresher emit → emitter receive.
    let validated: SharedValidatedResolverFn = Arc::new(|_h: &str| {
        Ok(ValidatedResolvedAnswer {
            answer: ResolvedAnswer {
                targets: vec!["198.51.100.7".to_string()],
                ttl_seconds: 60,
                resolver_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 53),
            },
            validation: DnssecValidationResult::Failed {
                reason: "synthetic-bogus-rrsig".to_string(),
            },
        })
    });

    // Plain (unvalidated) closure is required by the TickerConfig
    // even when DNSSEC is active — the ticker uses the validated path
    // when `dnssec_active` is true but the field is structurally
    // mandatory. A panicking closure proves it is never called.
    let unused: SharedResolverFn =
        Arc::new(|_h: &str| panic!("plain resolver MUST NOT be called when DNSSEC is active"));

    let cfg = TickerConfig {
        interval: Duration::from_millis(70),
        policy: Some(DnsRefreshPolicy {
            min_ttl_seconds: Some(0),
            max_stale_seconds: None,
            strategy: None,
        }),
        rebinding_policy: None,
        resolvers: one_dnssec_resolver(),
        hostnames: vec!["bogus.example.com".into()],
        keyset_id: Some("keyset-it-001".into()),
        issuer_kid: Some("kid-it-001".into()),
        policy_digest: Some(
            "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".into(),
        ),
        correlation_id: Some("corr-it-001".into()),
        source: "cellos-supervisor-it".into(),
        cell_id: "it-cell-dnssec".into(),
        run_id: "it-run-dnssec".into(),
        dnssec_policy: Some(DnsResolverDnssecPolicy {
            validate: true,
            fail_closed: false, // audit-only; preserves drift event for cross-correlation
            trust_anchors_path: None,
        }),
        trust_anchors: Some(TrustAnchors::iana_default()),
        validated_resolver: Some(validated),
    };

    let emitter = Arc::new(CollectingEmitter::default());
    let handle = spawn_continuous_ticker(cfg, emitter.clone(), unused);
    // Two ticks at 70ms — sleep 220ms to comfortably observe both.
    tokio::time::sleep(Duration::from_millis(220)).await;
    handle.shutdown.store(true, Ordering::SeqCst);
    let _ = tokio::time::timeout(Duration::from_secs(2), handle.task)
        .await
        .expect("ticker join timeout");

    let events = emitter.events.lock().unwrap();
    let dnssec_failed: Vec<_> = events
        .iter()
        .filter(|e| e.ty.ends_with("dns_authority_dnssec_failed"))
        .collect();
    assert!(
        !dnssec_failed.is_empty(),
        "synthetic bogus DNSSEC upstream must fire dns_authority_dnssec_failed (got {} total events: {:?})",
        events.len(),
        events.iter().map(|e| &e.ty).collect::<Vec<_>>()
    );
    let payload = dnssec_failed[0].data.as_ref().expect("event has data");
    assert_eq!(payload["reason"], "validation_failed");
    assert_eq!(payload["failClosed"], false);
    assert_eq!(payload["trustAnchorSource"], "iana-default");
    assert_eq!(payload["resolverId"], "resolver-do53-internal");
    assert_eq!(payload["hostname"], "bogus.example.com");
    assert_eq!(payload["cellId"], "it-cell-dnssec");
    assert_eq!(payload["correlationId"], "corr-it-001");
    // SEC-21 Phase 3h.1 — additive `source` discriminator. The
    // resolver-refresh path stamps `"resolver_refresh"` (the dataplane
    // surface stamps `"dataplane"`). Asserting the literal here locks
    // in the contract so a future emitter regression is caught at this
    // test layer instead of leaking into the SIEM dispatch path.
    assert_eq!(
        payload["source"], "resolver_refresh",
        "P3h resolver-refresh emissions must stamp source=resolver_refresh (P3h.1 contract)"
    );

    // Drift event must also fire (audit-only kept the answer) and
    // carry dnssecStatus="validation_failed" so SIEM correlation works.
    let drift_events: Vec<_> = events
        .iter()
        .filter(|e| e.ty.ends_with("dns_authority_drift"))
        .collect();
    assert!(
        !drift_events.is_empty(),
        "audit-only mode keeps the answer; drift must still fire"
    );
    let drift_data = drift_events[0].data.as_ref().expect("drift has data");
    assert_eq!(drift_data["dnssecStatus"], "validation_failed");

    // Suppress the unused-import warning for DnsRebindingPolicy when
    // reachability changes in the future; keep the import for forward
    // additions to this test (e.g. a combined DNSSEC + rebinding scenario).
    let _ = std::mem::size_of::<DnsRebindingPolicy>();
}