1use anyhow::Result;
5use async_trait::async_trait;
6use futures::Stream;
7use serde::{Deserialize, Serialize};
8use std::pin::Pin;
9use tokio_util::sync::CancellationToken;
10
11mod metadata;
12pub use metadata::{DiscoveryMetadata, MetadataSnapshot};
13
14mod mock;
15pub use mock::{MockDiscovery, SharedMockRegistry};
16mod kv_store;
17pub use kv_store::KVStoreDiscovery;
18
19mod kube;
20pub use kube::{KubeDiscoveryClient, hash_pod_name};
21
22pub mod utils;
23use crate::component::TransportType;
24pub use utils::watch_and_extract_field;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
31#[serde(rename_all = "snake_case")]
32pub enum EventTransportKind {
33 #[default]
35 Nats,
36 Zmq,
38}
39
40impl EventTransportKind {
41 pub fn from_env() -> Result<Self> {
45 match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE)
46 .as_deref()
47 {
48 Ok("nats") | Ok("") | Err(_) => Ok(Self::Nats),
49 Ok("zmq") => Ok(Self::Zmq),
50 Ok(other) => anyhow::bail!(
51 "Invalid DYN_EVENT_PLANE value '{}'. Valid values: 'nats', 'zmq'",
52 other
53 ),
54 }
55 }
56
57 pub fn from_env_or_default() -> Self {
60 Self::from_env().unwrap_or_else(|e| {
61 tracing::warn!("{}, defaulting to NATS", e);
62 Self::Nats
63 })
64 }
65
66 pub fn default_codec(&self) -> EventCodecKind {
69 match self {
70 Self::Nats => EventCodecKind::Json,
71 Self::Zmq => EventCodecKind::Msgpack,
72 }
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
80#[serde(rename_all = "snake_case")]
81pub enum EventCodecKind {
82 Json,
84 Msgpack,
86}
87
88impl EventCodecKind {
89 pub fn from_env() -> Result<Option<Self>> {
93 match std::env::var(crate::config::environment_names::event_plane::DYN_EVENT_PLANE_CODEC)
94 .as_deref()
95 {
96 Err(_) => Ok(None), Ok("") => Ok(None), Ok("json") => Ok(Some(Self::Json)),
99 Ok("msgpack") => Ok(Some(Self::Msgpack)),
100 Ok(other) => anyhow::bail!(
101 "Invalid DYN_EVENT_PLANE_CODEC value '{}'. Valid values: 'json', 'msgpack'",
102 other
103 ),
104 }
105 }
106
107 pub fn from_env_or_transport_default(transport: EventTransportKind) -> Self {
110 Self::from_env()
111 .unwrap_or_else(|e| {
112 tracing::warn!(
113 "{}, defaulting to {:?} for {:?}",
114 e,
115 transport.default_codec(),
116 transport
117 );
118 None
119 })
120 .unwrap_or_else(|| transport.default_codec())
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
129#[serde(tag = "kind", content = "config")]
130pub enum EventTransport {
131 Nats {
133 subject_prefix: String,
135 },
136 Zmq {
138 endpoint: String,
140 },
141 ZmqBroker {
143 xsub_endpoints: Vec<String>,
145 xpub_endpoints: Vec<String>,
147 },
148}
149
150impl EventTransport {
151 pub fn kind(&self) -> EventTransportKind {
153 match self {
154 Self::Nats { .. } => EventTransportKind::Nats,
155 Self::Zmq { .. } | Self::ZmqBroker { .. } => EventTransportKind::Zmq,
156 }
157 }
158
159 pub fn nats(subject_prefix: impl Into<String>) -> Self {
161 Self::Nats {
162 subject_prefix: subject_prefix.into(),
163 }
164 }
165
166 pub fn zmq(endpoint: impl Into<String>) -> Self {
168 Self::Zmq {
169 endpoint: endpoint.into(),
170 }
171 }
172
173 pub fn address(&self) -> &str {
176 match self {
177 Self::Nats { subject_prefix } => subject_prefix,
178 Self::Zmq { endpoint } => endpoint,
179 Self::ZmqBroker { xsub_endpoints, .. } => {
180 xsub_endpoints.first().map(|s| s.as_str()).unwrap_or("")
181 }
182 }
183 }
184}
185
186#[derive(Debug, Clone, PartialEq, Eq, Hash)]
189pub enum DiscoveryQuery {
190 AllEndpoints,
192 NamespacedEndpoints {
194 namespace: String,
195 },
196 ComponentEndpoints {
198 namespace: String,
199 component: String,
200 },
201 Endpoint {
203 namespace: String,
204 component: String,
205 endpoint: String,
206 },
207 AllModels,
208 NamespacedModels {
209 namespace: String,
210 },
211 ComponentModels {
212 namespace: String,
213 component: String,
214 },
215 EndpointModels {
216 namespace: String,
217 component: String,
218 endpoint: String,
219 },
220 EventChannels(EventChannelQuery),
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Hash)]
226pub struct EventChannelQuery {
227 pub namespace: Option<String>,
229 pub component: Option<String>,
231 pub topic: Option<String>,
233}
234
235impl EventChannelQuery {
236 pub fn all() -> Self {
238 Self {
239 namespace: None,
240 component: None,
241 topic: None,
242 }
243 }
244
245 pub fn namespace(namespace: impl Into<String>) -> Self {
247 Self {
248 namespace: Some(namespace.into()),
249 component: None,
250 topic: None,
251 }
252 }
253
254 pub fn component(namespace: impl Into<String>, component: impl Into<String>) -> Self {
256 Self {
257 namespace: Some(namespace.into()),
258 component: Some(component.into()),
259 topic: None,
260 }
261 }
262
263 pub fn topic(
265 namespace: impl Into<String>,
266 component: impl Into<String>,
267 topic: impl Into<String>,
268 ) -> Self {
269 Self {
270 namespace: Some(namespace.into()),
271 component: Some(component.into()),
272 topic: Some(topic.into()),
273 }
274 }
275
276 pub fn scope_level(&self) -> u8 {
278 if self.topic.is_some() {
279 3
280 } else if self.component.is_some() {
281 2
282 } else if self.namespace.is_some() {
283 1
284 } else {
285 0
286 }
287 }
288}
289
290#[derive(Debug, Clone, PartialEq, Eq)]
293pub enum DiscoverySpec {
294 Endpoint {
296 namespace: String,
297 component: String,
298 endpoint: String,
299 transport: TransportType,
301 },
302 Model {
303 namespace: String,
304 component: String,
305 endpoint: String,
306 card_json: serde_json::Value,
310 model_suffix: Option<String>,
313 },
314 EventChannel {
317 namespace: String,
318 component: String,
319 topic: String,
321 transport: EventTransport,
323 },
324}
325
326impl DiscoverySpec {
327 pub fn from_model<T>(
330 namespace: String,
331 component: String,
332 endpoint: String,
333 card: &T,
334 ) -> Result<Self>
335 where
336 T: Serialize,
337 {
338 Self::from_model_with_suffix(namespace, component, endpoint, card, None)
339 }
340
341 pub fn from_model_with_suffix<T>(
344 namespace: String,
345 component: String,
346 endpoint: String,
347 card: &T,
348 model_suffix: Option<String>,
349 ) -> Result<Self>
350 where
351 T: Serialize,
352 {
353 let card_json = serde_json::to_value(card)?;
354 Ok(Self::Model {
355 namespace,
356 component,
357 endpoint,
358 card_json,
359 model_suffix,
360 })
361 }
362
363 pub fn with_instance_id(self, instance_id: u64) -> DiscoveryInstance {
365 match self {
366 Self::Endpoint {
367 namespace,
368 component,
369 endpoint,
370 transport,
371 } => DiscoveryInstance::Endpoint(crate::component::Instance {
372 namespace,
373 component,
374 endpoint,
375 instance_id,
376 transport,
377 }),
378 Self::Model {
379 namespace,
380 component,
381 endpoint,
382 card_json,
383 model_suffix,
384 } => DiscoveryInstance::Model {
385 namespace,
386 component,
387 endpoint,
388 instance_id,
389 card_json,
390 model_suffix,
391 },
392 Self::EventChannel {
393 namespace,
394 component,
395 topic,
396 transport,
397 } => DiscoveryInstance::EventChannel {
398 namespace,
399 component,
400 topic,
401 instance_id,
402 transport,
403 },
404 }
405 }
406}
407
408#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
411#[serde(tag = "type")]
412pub enum DiscoveryInstance {
413 Endpoint(crate::component::Instance),
415 Model {
416 namespace: String,
417 component: String,
418 endpoint: String,
419 instance_id: u64,
420 card_json: serde_json::Value,
423 #[serde(default, skip_serializing_if = "Option::is_none")]
425 model_suffix: Option<String>,
426 },
427 EventChannel {
429 namespace: String,
430 component: String,
431 topic: String,
433 instance_id: u64,
434 transport: EventTransport,
436 },
437}
438
439impl DiscoveryInstance {
440 pub fn instance_id(&self) -> u64 {
442 match self {
443 Self::Endpoint(inst) => inst.instance_id,
444 Self::Model { instance_id, .. } => *instance_id,
445 Self::EventChannel { instance_id, .. } => *instance_id,
446 }
447 }
448
449 pub fn deserialize_model<T>(&self) -> Result<T>
452 where
453 T: for<'de> Deserialize<'de>,
454 {
455 match self {
456 Self::Model { card_json, .. } => Ok(serde_json::from_value(card_json.clone())?),
457 Self::Endpoint(_) => {
458 anyhow::bail!("Cannot deserialize model from Endpoint instance")
459 }
460 Self::EventChannel { .. } => {
461 anyhow::bail!("Cannot deserialize model from EventChannel instance")
462 }
463 }
464 }
465
466 pub fn id(&self) -> DiscoveryInstanceId {
469 match self {
470 Self::Endpoint(inst) => DiscoveryInstanceId::Endpoint(EndpointInstanceId {
471 namespace: inst.namespace.clone(),
472 component: inst.component.clone(),
473 endpoint: inst.endpoint.clone(),
474 instance_id: inst.instance_id,
475 }),
476 Self::Model {
477 namespace,
478 component,
479 endpoint,
480 instance_id,
481 model_suffix,
482 ..
483 } => DiscoveryInstanceId::Model(ModelCardInstanceId {
484 namespace: namespace.clone(),
485 component: component.clone(),
486 endpoint: endpoint.clone(),
487 instance_id: *instance_id,
488 model_suffix: model_suffix.clone(),
489 }),
490 Self::EventChannel {
491 namespace,
492 component,
493 topic,
494 instance_id,
495 ..
496 } => DiscoveryInstanceId::EventChannel(EventChannelInstanceId {
497 namespace: namespace.clone(),
498 component: component.clone(),
499 topic: topic.clone(),
500 instance_id: *instance_id,
501 }),
502 }
503 }
504}
505
506#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
508pub struct EndpointInstanceId {
509 pub namespace: String,
510 pub component: String,
511 pub endpoint: String,
512 pub instance_id: u64,
513}
514
515impl EndpointInstanceId {
516 pub fn to_path(&self) -> String {
518 format!(
519 "{}/{}/{}/{:x}",
520 self.namespace, self.component, self.endpoint, self.instance_id
521 )
522 }
523
524 pub fn from_path(path: &str) -> Result<Self> {
526 let parts: Vec<&str> = path.split('/').collect();
527 if parts.len() != 4 {
528 anyhow::bail!(
529 "Invalid EndpointInstanceId path: expected 4 parts, got {}",
530 parts.len()
531 );
532 }
533 Ok(Self {
534 namespace: parts[0].to_string(),
535 component: parts[1].to_string(),
536 endpoint: parts[2].to_string(),
537 instance_id: u64::from_str_radix(parts[3], 16)
538 .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
539 })
540 }
541}
542
543#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
546pub struct ModelCardInstanceId {
547 pub namespace: String,
548 pub component: String,
549 pub endpoint: String,
550 pub instance_id: u64,
551 pub model_suffix: Option<String>,
553}
554
555#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
557pub struct EventChannelInstanceId {
558 pub namespace: String,
559 pub component: String,
560 pub topic: String,
562 pub instance_id: u64,
563}
564
565impl EventChannelInstanceId {
566 pub fn to_path(&self) -> String {
568 format!(
569 "{}/{}/{}/{:x}",
570 self.namespace, self.component, self.topic, self.instance_id
571 )
572 }
573
574 pub fn from_path(path: &str) -> Result<Self> {
576 let parts: Vec<&str> = path.split('/').collect();
577 if parts.len() != 4 {
578 anyhow::bail!(
579 "Invalid EventChannelInstanceId path: expected 4 parts, got {}",
580 parts.len()
581 );
582 }
583 Ok(Self {
584 namespace: parts[0].to_string(),
585 component: parts[1].to_string(),
586 topic: parts[2].to_string(),
587 instance_id: u64::from_str_radix(parts[3], 16)
588 .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
589 })
590 }
591}
592
593impl ModelCardInstanceId {
594 pub fn to_path(&self) -> String {
596 match &self.model_suffix {
597 Some(suffix) => format!(
598 "{}/{}/{}/{:x}/{}",
599 self.namespace, self.component, self.endpoint, self.instance_id, suffix
600 ),
601 None => format!(
602 "{}/{}/{}/{:x}",
603 self.namespace, self.component, self.endpoint, self.instance_id
604 ),
605 }
606 }
607
608 pub fn from_path(path: &str) -> Result<Self> {
610 let parts: Vec<&str> = path.split('/').collect();
611 if parts.len() < 4 || parts.len() > 5 {
612 anyhow::bail!(
613 "Invalid ModelCardInstanceId path: expected 4 or 5 parts, got {}",
614 parts.len()
615 );
616 }
617 Ok(Self {
618 namespace: parts[0].to_string(),
619 component: parts[1].to_string(),
620 endpoint: parts[2].to_string(),
621 instance_id: u64::from_str_radix(parts[3], 16)
622 .map_err(|e| anyhow::anyhow!("Invalid instance_id hex: {}", e))?,
623 model_suffix: parts.get(4).map(|s| s.to_string()),
624 })
625 }
626}
627
628#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
630pub enum DiscoveryInstanceId {
631 Endpoint(EndpointInstanceId),
632 Model(ModelCardInstanceId),
633 EventChannel(EventChannelInstanceId),
634}
635
636impl DiscoveryInstanceId {
637 pub fn instance_id(&self) -> u64 {
639 match self {
640 Self::Endpoint(eid) => eid.instance_id,
641 Self::Model(mid) => mid.instance_id,
642 Self::EventChannel(ecid) => ecid.instance_id,
643 }
644 }
645
646 pub fn extract_endpoint_id(&self) -> Result<&EndpointInstanceId> {
648 match self {
649 Self::Endpoint(eid) => Ok(eid),
650 Self::Model(_) => anyhow::bail!("Expected Endpoint variant, got Model"),
651 Self::EventChannel(_) => anyhow::bail!("Expected Endpoint variant, got EventChannel"),
652 }
653 }
654
655 pub fn extract_model_id(&self) -> Result<&ModelCardInstanceId> {
657 match self {
658 Self::Model(mid) => Ok(mid),
659 Self::Endpoint(_) => anyhow::bail!("Expected Model variant, got Endpoint"),
660 Self::EventChannel(_) => anyhow::bail!("Expected Model variant, got EventChannel"),
661 }
662 }
663
664 pub fn extract_event_channel_id(&self) -> Result<&EventChannelInstanceId> {
666 match self {
667 Self::EventChannel(ecid) => Ok(ecid),
668 Self::Endpoint(_) => anyhow::bail!("Expected EventChannel variant, got Endpoint"),
669 Self::Model(_) => anyhow::bail!("Expected EventChannel variant, got Model"),
670 }
671 }
672}
673
674#[derive(Debug, Clone, PartialEq, Eq)]
676pub enum DiscoveryEvent {
677 Added(DiscoveryInstance),
679 Removed(DiscoveryInstanceId),
681}
682
683pub type DiscoveryStream = Pin<Box<dyn Stream<Item = Result<DiscoveryEvent>> + Send>>;
685
686#[async_trait]
688pub trait Discovery: Send + Sync {
689 fn instance_id(&self) -> u64;
692
693 async fn register(&self, spec: DiscoverySpec) -> Result<DiscoveryInstance>;
695
696 async fn unregister(&self, instance: DiscoveryInstance) -> Result<()>;
698
699 async fn list(&self, query: DiscoveryQuery) -> Result<Vec<DiscoveryInstance>>;
702
703 async fn list_and_watch(
706 &self,
707 query: DiscoveryQuery,
708 cancel_token: Option<CancellationToken>,
709 ) -> Result<DiscoveryStream>;
710}