use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use tokio_util::sync::CancellationToken;
mod metadata;
pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
mod mock;
pub use mock::{MockDiscovery, SharedMockRegistry};
mod kv_store;
pub use kv_store::KVStoreDiscovery;
mod kube;
pub use kube::{KubeDiscoveryClient, hash_pod_name};
pub mod utils;
use crate::component::TransportType;
pub use utils::watch_and_extract_field;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum EventTransportKind {
#[default]
Nats,
Zmq,
}
impl EventTransportKind {
pub fn from_env() -> Result<Self> {
match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
.as_deref()
{
Ok("nats") | Ok("") | Err(_) => Ok(Self::Nats),
Ok("zmq") => Ok(Self::Zmq),
Ok(other) => anyhow::bail!(
"Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'",
other
),
}
}
pub fn from_env_or_default() -> Self {
Self::from_env().unwrap_or_else(|e| {
tracing::warn!("{}, defaulting to NATS", e);
Self::Nats
})
}
pub fn default_codec(&self) -> EventCodecKind {
match self {
Self::Nats => EventCodecKind::Json,
Self::Zmq => EventCodecKind::Msgpack,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventCodecKind {
Json,
Msgpack,
}
impl EventCodecKind {
pub fn from_env() -> Result<Option<Self>> {
match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE_CODEC)
.as_deref()
{
Err(_) => Ok(None), Ok("") => Ok(None), Ok("json") => Ok(Some(Self::Json)),
Ok("msgpack") => Ok(Some(Self::Msgpack)),
Ok(other) => anyhow::bail!(
"Invalid DYN_EVENT_PLANE_CODEC value '{}'. Valid values: 'json', 'msgpack'",
other
),
}
}
pub fn from_env_or_transport_default(transport: EventTransportKind) -> Self {
Self::from_env()
.unwrap_or_else(|e| {
tracing::warn!(
"{}, defaulting to {:?} for {:?}",
e,
transport.default_codec(),
transport
);
None
})
.unwrap_or_else(|| transport.default_codec())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind", content = "config")]
pub enum EventTransport {
Nats {
subject_prefix: String,
},
Zmq {
endpoint: String,
},
ZmqBroker {
xsub_endpoints: Vec<String>,
xpub_endpoints: Vec<String>,
},
}
impl EventTransport {
pub fn kind(&self) -> EventTransportKind {
match self {
Self::Nats { .. } => EventTransportKind::Nats,
Self::Zmq { .. } | Self::ZmqBroker { .. } => EventTransportKind::Zmq,
}
}
pub fn nats(subject_prefix: impl Into<String>) -> Self {
Self::Nats {
subject_prefix: subject_prefix.into(),
}
}
pub fn zmq(endpoint: impl Into<String>) -> Self {
Self::Zmq {
endpoint: endpoint.into(),
}
}
pub fn address(&self) -> &str {
match self {
Self::Nats { subject_prefix } => subject_prefix,
Self::Zmq { endpoint } => endpoint,
Self::ZmqBroker { xsub_endpoints, .. } => {
xsub_endpoints.first().map(|s| s.as_str()).unwrap_or("")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DiscoveryQuery {
AllEndpoints,
NamespacedEndpoints {
namespace: String,
},
ComponentEndpoints {
namespace: String,
component: String,
},
Endpoint {
namespace: String,
component: String,
endpoint: String,
},
AllModels,
NamespacedModels {
namespace: String,
},
ComponentModels {
namespace: String,
component: String,
},
EndpointModels {
namespace: String,
component: String,
endpoint: String,
},
EventChannels(EventChannelQuery),
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EventChannelQuery {
pub namespace: Option<String>,
pub component: Option<String>,
pub topic: Option<String>,
}
impl EventChannelQuery {
pub fn all() -> Self {
Self {
namespace: None,
component: None,
topic: None,
}
}
pub fn namespace(namespace: impl Into<String>) -> Self {
Self {
namespace: Some(namespace.into()),
component: None,
topic: None,
}
}
pub fn component(namespace: impl Into<String>, component: impl Into<String>) -> Self {
Self {
namespace: Some(namespace.into()),
component: Some(component.into()),
topic: None,
}
}
pub fn topic(
namespace: impl Into<String>,
component: impl Into<String>,
topic: impl Into<String>,
) -> Self {
Self {
namespace: Some(namespace.into()),
component: Some(component.into()),
topic: Some(topic.into()),
}
}
pub fn scope_level(&self) -> u8 {
if self.topic.is_some() {
3
} else if self.component.is_some() {
2
} else if self.namespace.is_some() {
1
} else {
0
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoverySpec {
Endpoint {
namespace: String,
component: String,
endpoint: String,
transport: TransportType,
},
Model {
namespace: String,
component: String,
endpoint: String,
card_json: serde_json::Value,
model_suffix: Option<String>,
},
EventChannel {
namespace: String,
component: String,
topic: String,
transport: EventTransport,
},
}
impl DiscoverySpec {
pub fn from_model<T>(
namespace: String,
component: String,
endpoint: String,
card: &T,
) -> Result<Self>
where
T: Serialize,
{
Self::from_model_with_suffix(namespace, component, endpoint, card, None)
}
pub fn from_model_with_suffix<T>(
namespace: String,
component: String,
endpoint: String,
card: &T,
model_suffix: Option<String>,
) -> Result<Self>
where
T: Serialize,
{
let card_json = serde_json::to_value(card)?;
Ok(Self::Model {
namespace,
component,
endpoint,
card_json,
model_suffix,
})
}
pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
match self {
Self::Endpoint {
namespace,
component,
endpoint,
transport,
} => DiscoveryInstance::Endpoint(crate::component::Instance {
namespace,
component,
endpoint,
instance_id,
transport,
}),
Self::Model {
namespace,
component,
endpoint,
card_json,
model_suffix,
} => DiscoveryInstance::Model {
namespace,
component,
endpoint,
instance_id,
card_json,
model_suffix,
},
Self::EventChannel {
namespace,
component,
topic,
transport,
} => DiscoveryInstance::EventChannel {
namespace,
component,
topic,
instance_id,
transport,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum DiscoveryInstance {
Endpoint(crate::component::Instance),
Model {
namespace: String,
component: String,
endpoint: String,
instance_id: u64,
card_json: serde_json::Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
model_suffix: Option<String>,
},
EventChannel {
namespace: String,
component: String,
topic: String,
instance_id: u64,
transport: EventTransport,
},
}
impl DiscoveryInstance {
pub fn instance_id(&self) -> u64 {
match self {
Self::Endpoint(inst) => inst.instance_id,
Self::Model { instance_id, .. } => *instance_id,
Self::EventChannel { instance_id, .. } => *instance_id,
}
}
pub fn deserialize_model<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
match self {
Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
Self::Endpoint(_) => {
anyhow::bail!("Cannot deserialize model from Endpoint instance")
}
Self::EventChannel { .. } => {
anyhow::bail!("Cannot deserialize model from EventChannel instance")
}
}
}
pub fn id(&self) -> DiscoveryInstanceId {
match self {
Self::Endpoint(inst) => DiscoveryInstanceId::Endpoint(EndpointInstanceId {
namespace: inst.namespace.clone(),
component: inst.component.clone(),
endpoint: inst.endpoint.clone(),
instance_id: inst.instance_id,
}),
Self::Model {
namespace,
component,
endpoint,
instance_id,
model_suffix,
..
} => DiscoveryInstanceId::Model(ModelCardInstanceId {
namespace: namespace.clone(),
component: component.clone(),
endpoint: endpoint.clone(),
instance_id: *instance_id,
model_suffix: model_suffix.clone(),
}),
Self::EventChannel {
namespace,
component,
topic,
instance_id,
..
} => DiscoveryInstanceId::EventChannel(EventChannelInstanceId {
namespace: namespace.clone(),
component: component.clone(),
topic: topic.clone(),
instance_id: *instance_id,
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct EndpointInstanceId {
pub namespace: String,
pub component: String,
pub endpoint: String,
pub instance_id: u64,
}
impl EndpointInstanceId {
pub fn to_path(&self) -> String {
format!(
"{}/{}/{}/{:x}",
self.namespace, self.component, self.endpoint, self.instance_id
)
}
pub fn from_path(path: &str) -> Result<Self> {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() != 4 {
anyhow::bail!(
"Invalid EndpointInstanceId path: expected 4 parts, got {}",
parts.len()
);
}
Ok(Self {
namespace: parts[0].to_string(),
component: parts[1].to_string(),
endpoint: parts[2].to_string(),
instance_id: u64::from_str_radix(parts[3], 16)
.map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ModelCardInstanceId {
pub namespace: String,
pub component: String,
pub endpoint: String,
pub instance_id: u64,
pub model_suffix: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct EventChannelInstanceId {
pub namespace: String,
pub component: String,
pub topic: String,
pub instance_id: u64,
}
impl EventChannelInstanceId {
pub fn to_path(&self) -> String {
format!(
"{}/{}/{}/{:x}",
self.namespace, self.component, self.topic, self.instance_id
)
}
pub fn from_path(path: &str) -> Result<Self> {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() != 4 {
anyhow::bail!(
"Invalid EventChannelInstanceId path: expected 4 parts, got {}",
parts.len()
);
}
Ok(Self {
namespace: parts[0].to_string(),
component: parts[1].to_string(),
topic: parts[2].to_string(),
instance_id: u64::from_str_radix(parts[3], 16)
.map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
})
}
}
impl ModelCardInstanceId {
pub fn to_path(&self) -> String {
match &self.model_suffix {
Some(suffix) => format!(
"{}/{}/{}/{:x}/{}",
self.namespace, self.component, self.endpoint, self.instance_id, suffix
),
None => format!(
"{}/{}/{}/{:x}",
self.namespace, self.component, self.endpoint, self.instance_id
),
}
}
pub fn from_path(path: &str) -> Result<Self> {
let parts: Vec<&str> = path.split('/').collect();
if parts.len() < 4 || parts.len() > 5 {
anyhow::bail!(
"Invalid ModelCardInstanceId path: expected 4 or 5 parts, got {}",
parts.len()
);
}
Ok(Self {
namespace: parts[0].to_string(),
component: parts[1].to_string(),
endpoint: parts[2].to_string(),
instance_id: u64::from_str_radix(parts[3], 16)
.map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
model_suffix: parts.get(4).map(|s| s.to_string()),
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DiscoveryInstanceId {
Endpoint(EndpointInstanceId),
Model(ModelCardInstanceId),
EventChannel(EventChannelInstanceId),
}
impl DiscoveryInstanceId {
pub fn instance_id(&self) -> u64 {
match self {
Self::Endpoint(eid) => eid.instance_id,
Self::Model(mid) => mid.instance_id,
Self::EventChannel(ecid) => ecid.instance_id,
}
}
pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
match self {
Self::Endpoint(eid) => Ok(eid),
Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
Self::EventChannel(_) => anyhow::bail!("Expected Endpoint variant, got EventChannel"),
}
}
pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
match self {
Self::Model(mid) => Ok(mid),
Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
Self::EventChannel(_) => anyhow::bail!("Expected Model variant, got EventChannel"),
}
}
pub fn extract_event_channel_id(&self) -> Result<&EventChannelInstanceId> {
match self {
Self::EventChannel(ecid) => Ok(ecid),
Self::Endpoint(_) => anyhow::bail!("Expected EventChannel variant, got Endpoint"),
Self::Model(_) => anyhow::bail!("Expected EventChannel variant, got Model"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
Added(DiscoveryInstance),
Removed(DiscoveryInstanceId),
}
pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
#[async_trait]
pub trait Discovery: Send + Sync {
fn instance_id(&self) -> u64;
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
async fn list_and_watch(
&self,
query: DiscoveryQuery,
cancel_token: Option<CancellationToken>,
) -> Result<DiscoveryStream>;
fn shutdown(&self) {}
}