use super::{Indexer, worker_query::WorkerQueryClient};
use anyhow::Result;
use dynamo_kv_router::{
config::KvRouterConfig,
protocols::{KV_EVENT_SUBJECT, RouterEvent},
};
use dynamo_runtime::{
component::Component, discovery::EventTransportKind, prelude::*,
transports::event_plane::EventSubscriber,
};
async fn start_kv_router_background_event_plane(
component: Component,
indexer: Indexer,
transport_kind: EventTransportKind,
) -> Result<()> {
let cancellation_token = component.drt().primary_token();
let mut subscriber =
EventSubscriber::for_component_with_transport(&component, KV_EVENT_SUBJECT, transport_kind)
.await?
.typed::<RouterEvent>();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let worker_query_client = WorkerQueryClient::spawn(component.clone(), indexer).await?;
let kv_event_subject = format!(
"namespace.{}.component.{}.{}",
component.namespace().name(),
component.name(),
KV_EVENT_SUBJECT
);
match transport_kind {
EventTransportKind::Nats => {
tracing::info!(
subject = %kv_event_subject,
"KV Router using NATS Core subscription (local_indexer mode)"
);
}
EventTransportKind::Zmq => {
tracing::info!(
subject = %kv_event_subject,
"KV Router using ZMQ event plane subscription (local_indexer mode)"
);
}
}
tokio::spawn(async move {
loop {
tokio::select! {
biased;
_ = cancellation_token.cancelled() => {
tracing::debug!("KV Router event plane background task received cancellation signal");
break;
}
Some(result) = subscriber.next() => {
let (envelope, event) = match result {
Ok((envelope, event)) => (envelope, event),
Err(e) => {
tracing::warn!("Failed to receive RouterEvent from event plane: {e:?}");
continue;
}
};
tracing::trace!(
"Received event from publisher {} (seq {})",
envelope.publisher_id,
envelope.sequence
);
tracing::trace!(
"Forwarding live event to recovery coordinator for worker {} dp_rank {} event_id {}",
event.worker_id,
event.event.dp_rank,
event.event.event_id
);
worker_query_client.handle_live_event(event).await;
}
}
}
tracing::debug!("KV Router event plane background task exiting");
});
Ok(())
}
pub async fn start_subscriber(
component: Component,
kv_router_config: &KvRouterConfig,
indexer: Indexer,
) -> Result<()> {
let transport_kind = EventTransportKind::from_env_or_default();
if kv_router_config.durable_kv_events {
tracing::warn!(
"--durable-kv-events is deprecated and will be removed in a future release. \
The event-plane subscriber (local_indexer mode) is now the recommended path."
);
if transport_kind == EventTransportKind::Zmq {
tracing::warn!(
"--durable-kv-events requires NATS, but ZMQ event plane is configured; falling back to JetStream anyway"
);
}
tracing::info!("Using JetStream subscription (--durable-kv-events enabled)");
let consumer_id = component.drt().discovery().instance_id().to_string();
super::jetstream::start_kv_router_background(
component,
consumer_id,
indexer,
kv_router_config,
)
.await
} else {
if transport_kind == EventTransportKind::Zmq {
if kv_router_config.router_snapshot_threshold.is_some()
|| kv_router_config.router_reset_states
{
tracing::warn!(
"ZMQ event plane does not support KV snapshots or state reset; ignoring snapshot/reset settings"
);
}
tracing::info!("Using ZMQ event plane subscription (local_indexer mode)");
} else {
tracing::info!("Using NATS Core subscription (local_indexer mode)");
}
start_kv_router_background_event_plane(component, indexer, transport_kind).await
}
}