dynamo_runtime/discovery/
mod.rs1use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub enum DiscoveryQuery {
30 AllEndpoints,
32 NamespacedEndpoints {
34 namespace: String,
35 },
36 ComponentEndpoints {
38 namespace: String,
39 component: String,
40 },
41 Endpoint {
43 namespace: String,
44 component: String,
45 endpoint: String,
46 },
47 AllModels,
48 NamespacedModels {
49 namespace: String,
50 },
51 ComponentModels {
52 namespace: String,
53 component: String,
54 },
55 EndpointModels {
56 namespace: String,
57 component: String,
58 endpoint: String,
59 },
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum DiscoverySpec {
66 Endpoint {
68 namespace: String,
69 component: String,
70 endpoint: String,
71 transport: TransportType,
73 },
74 Model {
75 namespace: String,
76 component: String,
77 endpoint: String,
78 card_json: serde_json::Value,
82 },
83}
84
85impl DiscoverySpec {
86 pub fn from_model<T>(
89 namespace: String,
90 component: String,
91 endpoint: String,
92 card: &T,
93 ) -> Result<Self>
94 where
95 T: Serialize,
96 {
97 let card_json = serde_json::to_value(card)?;
98 Ok(Self::Model {
99 namespace,
100 component,
101 endpoint,
102 card_json,
103 })
104 }
105
106 pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
108 match self {
109 Self::Endpoint {
110 namespace,
111 component,
112 endpoint,
113 transport,
114 } => DiscoveryInstance::Endpoint(crate::component::Instance {
115 namespace,
116 component,
117 endpoint,
118 instance_id,
119 transport,
120 }),
121 Self::Model {
122 namespace,
123 component,
124 endpoint,
125 card_json,
126 } => DiscoveryInstance::Model {
127 namespace,
128 component,
129 endpoint,
130 instance_id,
131 card_json,
132 },
133 }
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
140#[serde(tag = "type")]
141pub enum DiscoveryInstance {
142 Endpoint(crate::component::Instance),
144 Model {
145 namespace: String,
146 component: String,
147 endpoint: String,
148 instance_id: u64,
149 card_json: serde_json::Value,
152 },
153}
154
155impl DiscoveryInstance {
156 pub fn instance_id(&self) -> u64 {
158 match self {
159 Self::Endpoint(inst) => inst.instance_id,
160 Self::Model { instance_id, .. } => *instance_id,
161 }
162 }
163
164 pub fn deserialize_model<T>(&self) -> Result<T>
167 where
168 T: for<'de> Deserialize<'de>,
169 {
170 match self {
171 Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
172 Self::Endpoint(_) => {
173 anyhow::bail!("Cannot deserialize model from Endpoint instance")
174 }
175 }
176 }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
181pub enum DiscoveryEvent {
182 Added(DiscoveryInstance),
184 Removed(u64),
186}
187
188pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
190
191#[async_trait]
193pub trait Discovery: Send + Sync {
194 fn instance_id(&self) -> u64;
197
198 async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
200
201 async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
204
205 async fn list_and_watch(
208 &self,
209 query: DiscoveryQuery,
210 cancel_token: Option<CancellationToken>,
211 ) -> Result<DiscoveryStream>;
212}