cellos-supervisor 0.5.1

CellOS execution-cell runner — boots cells in Firecracker microVMs or gVisor, enforces narrow typed authority, emits signed CloudEvents.
Documentation
//! SEC-21 Phase 2 — bridge from the ticker's synchronous
//! [`super::ticker::DriftEmitter`] trait to the supervisor's async
//! [`cellos_core::ports::EventSink`] pipeline.
//!
//! Mirrors [`crate::dns_proxy::spawn::EventSinkEmitter`] one-for-one. The
//! ticker's `emit` happens on the spawned tokio task — which has a runtime
//! context — so capturing a [`tokio::runtime::Handle`] here is mostly
//! defensive; the abstraction stays explicit so a future refactor that
//! moves the ticker onto a bare `std::thread` still works without changing
//! call sites.
//!
//! As with the proxy emitter, both the primary and (optional) JSONL sinks
//! are dispatched fire-and-forget via `Handle::spawn`. The ticker loop
//! must never block on emit completion — if a sink stalls, drift events
//! get queued in tokio's spawn queue, but the next refresh still happens
//! on schedule.

use std::sync::Arc;

use cellos_core::CloudEventV1;

use super::ticker::DriftEmitter;

/// Adapter from the ticker's synchronous [`DriftEmitter`] trait to the
/// supervisor's async [`cellos_core::ports::EventSink`] pipeline.
///
/// **Tokio-context constraint:** `runtime_handle` MUST be captured on a
/// thread that already has a tokio runtime context. The supervisor builds
/// this struct in async code via [`Self::capture_current`].
pub struct EventSinkEmitter {
    pub runtime_handle: tokio::runtime::Handle,
    pub sink: Arc<dyn cellos_core::ports::EventSink>,
    pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
}

impl EventSinkEmitter {
    /// Build an emitter capturing the **current** tokio runtime handle.
    ///
    /// Panics with a clear message if no tokio runtime is reachable —
    /// this struct exists specifically to bridge sync call-sites to async
    /// sinks, but the bridge itself must be constructed inside a runtime.
    pub fn capture_current(
        sink: Arc<dyn cellos_core::ports::EventSink>,
        jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
    ) -> Self {
        let handle = tokio::runtime::Handle::try_current().expect(
            "resolver_refresh::sink_emitter::EventSinkEmitter::capture_current called outside a tokio runtime context",
        );
        Self {
            runtime_handle: handle,
            sink,
            jsonl_sink,
        }
    }
}

impl DriftEmitter for EventSinkEmitter {
    fn emit(&self, event: CloudEventV1) {
        // Fire-and-forget: drift events are observability, not control
        // flow. A sink failure logs and moves on; the ticker never
        // blocks on emit completion.
        let sink = self.sink.clone();
        let jsonl = self.jsonl_sink.clone();
        let event_for_jsonl = event.clone();
        self.runtime_handle.spawn(async move {
            if let Err(e) = sink.emit(&event).await {
                tracing::warn!(
                    target: "cellos.supervisor.resolver_refresh.ticker",
                    error = %e,
                    "primary sink emit failed for dns_authority_drift event"
                );
            }
        });
        if let Some(j) = jsonl {
            self.runtime_handle.spawn(async move {
                if let Err(e) = j.emit(&event_for_jsonl).await {
                    tracing::warn!(
                        target: "cellos.supervisor.resolver_refresh.ticker",
                        error = %e,
                        "jsonl sink emit failed for dns_authority_drift event"
                    );
                }
            });
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex;

    /// Minimal in-memory sink for the EventSinkEmitter test.
    struct CountingSink {
        count: Mutex<u64>,
    }
    #[async_trait::async_trait]
    impl cellos_core::ports::EventSink for CountingSink {
        async fn emit(&self, _event: &CloudEventV1) -> Result<(), cellos_core::error::CellosError> {
            *self.count.lock().unwrap() += 1;
            Ok(())
        }
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn drift_event_sink_emitter_dispatches_to_runtime() {
        let sink = Arc::new(CountingSink {
            count: Mutex::new(0),
        });
        let emitter = EventSinkEmitter::capture_current(sink.clone(), None);
        let event = CloudEventV1 {
            specversion: "1.0".into(),
            id: "evt-drift-1".into(),
            source: "test".into(),
            ty: "dev.cellos.events.cell.observability.v1.dns_authority_drift".into(),
            datacontenttype: Some("application/json".into()),
            data: Some(serde_json::json!({"k": "v"})),
            time: Some(chrono::Utc::now().to_rfc3339()),
            traceparent: None,
        };
        emitter.emit(event);
        // Yield until the spawned task runs.
        for _ in 0..50 {
            if *sink.count.lock().unwrap() >= 1 {
                break;
            }
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        }
        assert_eq!(
            *sink.count.lock().unwrap(),
            1,
            "sink should have received one event via the runtime handle"
        );
    }
}