use std::sync::Arc;
use cellos_core::CloudEventV1;
use super::ticker::DriftEmitter;
pub struct EventSinkEmitter {
pub runtime_handle: tokio::runtime::Handle,
pub sink: Arc<dyn cellos_core::ports::EventSink>,
pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
}
impl EventSinkEmitter {
pub fn capture_current(
sink: Arc<dyn cellos_core::ports::EventSink>,
jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
) -> Self {
let handle = tokio::runtime::Handle::try_current().expect(
"resolver_refresh::sink_emitter::EventSinkEmitter::capture_current called outside a tokio runtime context",
);
Self {
runtime_handle: handle,
sink,
jsonl_sink,
}
}
}
impl DriftEmitter for EventSinkEmitter {
fn emit(&self, event: CloudEventV1) {
let sink = self.sink.clone();
let jsonl = self.jsonl_sink.clone();
let event_for_jsonl = event.clone();
self.runtime_handle.spawn(async move {
if let Err(e) = sink.emit(&event).await {
tracing::warn!(
target: "cellos.supervisor.resolver_refresh.ticker",
error = %e,
"primary sink emit failed for dns_authority_drift event"
);
}
});
if let Some(j) = jsonl {
self.runtime_handle.spawn(async move {
if let Err(e) = j.emit(&event_for_jsonl).await {
tracing::warn!(
target: "cellos.supervisor.resolver_refresh.ticker",
error = %e,
"jsonl sink emit failed for dns_authority_drift event"
);
}
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct CountingSink {
count: Mutex<u64>,
}
#[async_trait::async_trait]
impl cellos_core::ports::EventSink for CountingSink {
async fn emit(&self, _event: &CloudEventV1) -> Result<(), cellos_core::error::CellosError> {
*self.count.lock().unwrap() += 1;
Ok(())
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn drift_event_sink_emitter_dispatches_to_runtime() {
let sink = Arc::new(CountingSink {
count: Mutex::new(0),
});
let emitter = EventSinkEmitter::capture_current(sink.clone(), None);
let event = CloudEventV1 {
specversion: "1.0".into(),
id: "evt-drift-1".into(),
source: "test".into(),
ty: "dev.cellos.events.cell.observability.v1.dns_authority_drift".into(),
datacontenttype: Some("application/json".into()),
data: Some(serde_json::json!({"k": "v"})),
time: Some(chrono::Utc::now().to_rfc3339()),
traceparent: None,
};
emitter.emit(event);
for _ in 0..50 {
if *sink.count.lock().unwrap() >= 1 {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert_eq!(
*sink.count.lock().unwrap(),
1,
"sink should have received one event via the runtime handle"
);
}
}