use serde::{Deserialize, Serialize};
use crate::error::KubeError;
use crate::kind::KubeResource;
#[derive(Debug)]
pub enum WatchEvent<R> {
Added(R),
Modified(R),
Deleted(R),
Bookmark { resource_version: String },
Error(KubeError),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RawWatchEvent {
#[serde(rename = "type")]
pub event_type: String,
pub object: serde_json::Value,
}
impl RawWatchEvent {
pub fn into_typed<R: KubeResource>(self) -> Result<WatchEvent<R>, KubeError> {
match self.event_type.as_str() {
"ADDED" => {
let r: R = serde_json::from_value(self.object)
.map_err(|e| KubeError::Decode(format!("ADDED object: {e}")))?;
Ok(WatchEvent::Added(r))
}
"MODIFIED" => {
let r: R = serde_json::from_value(self.object)
.map_err(|e| KubeError::Decode(format!("MODIFIED object: {e}")))?;
Ok(WatchEvent::Modified(r))
}
"DELETED" => {
let r: R = serde_json::from_value(self.object)
.map_err(|e| KubeError::Decode(format!("DELETED object: {e}")))?;
Ok(WatchEvent::Deleted(r))
}
"BOOKMARK" => {
let rv = self
.object
.get("metadata")
.and_then(|m| m.get("resourceVersion"))
.and_then(|v| v.as_str())
.ok_or_else(|| {
KubeError::Decode(
"BOOKMARK missing metadata.resourceVersion".into(),
)
})?
.to_string();
Ok(WatchEvent::Bookmark { resource_version: rv })
}
"ERROR" => {
let code = self.object.get("code").and_then(|c| c.as_u64()).unwrap_or(0) as u16;
let message = self
.object
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown")
.to_string();
Ok(WatchEvent::Error(KubeError::ApiStatus {
code,
kind: crate::error::ApiStatusKind::from_code(code),
message,
}))
}
other => Err(KubeError::Decode(format!(
"unknown watch event type {other:?}"
))),
}
}
}
#[async_trait::async_trait]
pub trait Watcher<R: KubeResource>: Send + Sync {
async fn watch(
&self,
namespace: Option<&str>,
resource_version: &str,
) -> Result<
std::pin::Pin<
Box<dyn futures_core::Stream<Item = Result<WatchEvent<R>, KubeError>> + Send>,
>,
KubeError,
>;
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn raw_added_decodes_to_typed() {
let raw = RawWatchEvent {
event_type: "ADDED".into(),
object: json!({"metadata": {"name": "p1"}}),
};
let bookmark = RawWatchEvent {
event_type: "BOOKMARK".into(),
object: json!({"metadata": {"resourceVersion": "12345"}}),
};
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct Stub {
#[serde(default)]
metadata: serde_json::Value,
}
impl crate::kind::KubeResource for Stub {
const GVK: crate::kind::GroupVersionKind = crate::kind::GroupVersionKind {
group: "", version: "v1", kind: "Stub",
};
const GVR: crate::kind::GroupVersionResource = crate::kind::GroupVersionResource {
group: "", version: "v1", resource: "stubs",
};
const SCOPE: crate::kind::Scope = crate::kind::Scope::Namespaced;
fn name(&self) -> std::borrow::Cow<'_, str> { "".into() }
fn namespace(&self) -> Option<std::borrow::Cow<'_, str>> { None }
fn resource_version(&self) -> Option<std::borrow::Cow<'_, str>> { None }
}
let typed: WatchEvent<Stub> = raw.into_typed().unwrap();
assert!(matches!(typed, WatchEvent::Added(_)));
let typed: WatchEvent<Stub> = bookmark.into_typed().unwrap();
assert!(matches!(typed, WatchEvent::Bookmark { .. }));
}
#[test]
fn unknown_event_type_errors() {
let raw = RawWatchEvent {
event_type: "GARBAGE".into(),
object: json!({}),
};
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct Stub;
impl crate::kind::KubeResource for Stub {
const GVK: crate::kind::GroupVersionKind = crate::kind::GroupVersionKind {
group: "", version: "v1", kind: "Stub",
};
const GVR: crate::kind::GroupVersionResource = crate::kind::GroupVersionResource {
group: "", version: "v1", resource: "stubs",
};
const SCOPE: crate::kind::Scope = crate::kind::Scope::Namespaced;
fn name(&self) -> std::borrow::Cow<'_, str> { "".into() }
fn namespace(&self) -> Option<std::borrow::Cow<'_, str>> { None }
fn resource_version(&self) -> Option<std::borrow::Cow<'_, str>> { None }
}
let r: Result<WatchEvent<Stub>, _> = raw.into_typed();
assert!(r.is_err());
assert!(matches!(r.unwrap_err(), KubeError::Decode(_)));
}
#[test]
fn error_event_decodes_to_apistatus() {
let raw = RawWatchEvent {
event_type: "ERROR".into(),
object: json!({"code": 410, "message": "too old resource version"}),
};
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct Stub;
impl crate::kind::KubeResource for Stub {
const GVK: crate::kind::GroupVersionKind = crate::kind::GroupVersionKind {
group: "", version: "v1", kind: "Stub",
};
const GVR: crate::kind::GroupVersionResource = crate::kind::GroupVersionResource {
group: "", version: "v1", resource: "stubs",
};
const SCOPE: crate::kind::Scope = crate::kind::Scope::Namespaced;
fn name(&self) -> std::borrow::Cow<'_, str> { "".into() }
fn namespace(&self) -> Option<std::borrow::Cow<'_, str>> { None }
fn resource_version(&self) -> Option<std::borrow::Cow<'_, str>> { None }
}
let typed: WatchEvent<Stub> = raw.into_typed().unwrap();
match typed {
WatchEvent::Error(KubeError::ApiStatus { code, kind, .. }) => {
assert_eq!(code, 410);
assert_eq!(kind, crate::error::ApiStatusKind::Gone);
}
other => panic!("expected Error event, got {other:?}"),
}
}
}