use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use tokio_util::sync::CancellationToken;
mod metadata;
pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
mod mock;
pub use mock::{MockDiscovery, SharedMockRegistry};
mod kv_store;
pub use kv_store::KVStoreDiscovery;
mod kube;
pub use kube::{KubeDiscoveryClient, hash_pod_name};
pub mod utils;
use crate::component::TransportType;
pub use utils::watch_and_extract_field;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DiscoveryQuery {
AllEndpoints,
NamespacedEndpoints {
namespace: String,
},
ComponentEndpoints {
namespace: String,
component: String,
},
Endpoint {
namespace: String,
component: String,
endpoint: String,
},
AllModels,
NamespacedModels {
namespace: String,
},
ComponentModels {
namespace: String,
component: String,
},
EndpointModels {
namespace: String,
component: String,
endpoint: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoverySpec {
Endpoint {
namespace: String,
component: String,
endpoint: String,
transport: TransportType,
},
Model {
namespace: String,
component: String,
endpoint: String,
card_json: serde_json::Value,
},
}
impl DiscoverySpec {
pub fn from_model<T>(
namespace: String,
component: String,
endpoint: String,
card: &T,
) -> Result<Self>
where
T: Serialize,
{
let card_json = serde_json::to_value(card)?;
Ok(Self::Model {
namespace,
component,
endpoint,
card_json,
})
}
pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
match self {
Self::Endpoint {
namespace,
component,
endpoint,
transport,
} => DiscoveryInstance::Endpoint(crate::component::Instance {
namespace,
component,
endpoint,
instance_id,
transport,
}),
Self::Model {
namespace,
component,
endpoint,
card_json,
} => DiscoveryInstance::Model {
namespace,
component,
endpoint,
instance_id,
card_json,
},
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum DiscoveryInstance {
Endpoint(crate::component::Instance),
Model {
namespace: String,
component: String,
endpoint: String,
instance_id: u64,
card_json: serde_json::Value,
},
}
impl DiscoveryInstance {
pub fn instance_id(&self) -> u64 {
match self {
Self::Endpoint(inst) => inst.instance_id,
Self::Model { instance_id, .. } => *instance_id,
}
}
pub fn deserialize_model<T>(&self) -> Result<T>
where
T: for<'de> Deserialize<'de>,
{
match self {
Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
Self::Endpoint(_) => {
anyhow::bail!("Cannot deserialize model from Endpoint instance")
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
Added(DiscoveryInstance),
Removed(u64),
}
pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
#[async_trait]
pub trait Discovery: Send + Sync {
fn instance_id(&self) -> u64;
async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
async fn list_and_watch(
&self,
query: DiscoveryQuery,
cancel_token: Option<CancellationToken>,
) -> Result<DiscoveryStream>;
}