1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use sk_core::k8s::KubeResourceExt;
pub mod dyn_obj_watcher;
pub mod pod_watcher;
use std::collections::HashSet;
use std::mem::take;
use std::pin::Pin;
use async_trait::async_trait;
use clockabilly::prelude::*;
use futures::{
Stream,
StreamExt,
};
use kube::runtime::watcher::Event;
use sk_core::errors::*;
use tokio::sync::mpsc;
use tracing::*;
pub(super) type ObjStream<T> = Pin<Box<dyn Stream<Item = anyhow::Result<Event<T>>> + Send>>;
#[cfg_attr(test, automock)]
#[async_trait]
pub(crate) trait EventHandler<T: Clone + Send + Sync> {
async fn applied(&mut self, obj: T, ts: i64) -> EmptyResult;
async fn deleted(&mut self, ns_name: &str, ts: i64) -> EmptyResult;
}
pub struct ObjWatcher<T: Clone + Send + Sync + kube::ResourceExt> {
handler: Box<dyn EventHandler<T> + Send>,
stream: ObjStream<T>,
clock: Box<dyn Clockable + Send>,
is_ready: bool,
ready_tx: mpsc::Sender<bool>,
init_buffer: Vec<T>,
index: HashSet<String>,
}
impl<T: Clone + Send + Sync + kube::ResourceExt> ObjWatcher<T> {
fn new(
handler: Box<dyn EventHandler<T> + Send>,
stream: ObjStream<T>,
ready_tx: mpsc::Sender<bool>,
) -> ObjWatcher<T> {
ObjWatcher {
handler,
stream,
clock: UtcClock::boxed(),
is_ready: false,
ready_tx,
init_buffer: vec![],
index: HashSet::new(),
}
}
// This is not a reference because it needs to "own" itself when tokio spawns it
pub async fn start(mut self) {
while let Some(res) = self.stream.next().await {
let ts = self.clock.now_ts();
match res {
Ok(ref evt) => self.handle_event(evt, ts).await.unwrap_or_else(|err| {
// This error is "sortof" OK, in the sense that if we can't handle a single
// event, the tracer can potentially keep going on other events, so we don't
// display a stack trace here.
error!("could not handle event:\n\n{err}\n");
}),
Err(err) => {
// However, if there's a fundamental error getting something from the stream,
// the tracer can still maybe attempt to keep going, but that indicates
// somthing more problematic and program-stopping is going on, so we display a
// stack trace (using skerr).
skerr!(err, "watcher received error on stream");
},
}
}
}
pub(crate) async fn handle_event(&mut self, evt: &Event<T>, ts: i64) -> EmptyResult {
// We don't expect the trace store to panic, but if it does we should panic here too
// (the unlock only fails here if the lock has been Poisoned, e.g., something panicked
// while holding the lock)
match evt {
Event::Apply(obj) => {
self.handler.applied(obj.clone(), ts).await?;
self.index.insert(obj.namespaced_name());
},
Event::Delete(obj) => {
let ns_name = obj.namespaced_name();
self.handler.deleted(&ns_name, ts).await?;
self.index.remove(&ns_name);
},
Event::Init => (),
Event::InitApply(obj) => self.init_buffer.push(obj.clone()),
Event::InitDone => {
// We swap the old index for an (empty) new one, and remove events from the old
// and putting them into the new. Then we know that anything left in the old
// after we're done was deleted in the intervening period.
let mut old_objs = take(&mut self.index);
for obj in &self.init_buffer {
let ns_name = obj.namespaced_name();
old_objs.remove(&ns_name);
// We have to unconditionally apply since we don't know if it changed
// in the intervening period
self.handler.applied(obj.clone(), ts).await?;
self.index.insert(ns_name);
}
for ns_name in &old_objs {
self.handler.deleted(ns_name, ts).await?;
}
// When the watcher first starts up it does a List call, which (internally) gets
// converted into a "Restarted" event that contains all of the listed objects.
// Once we've handled this event the first time, we know we have a complete view of
// the cluster at startup time.
if !self.is_ready {
self.is_ready = true;
// unlike golang, sending is non-blocking
// if nobody's listening on the other end it's "fine" so we ignore the error
if let Err(e) = self.ready_tx.send(true).await {
error!("failed to notify ready: {e}");
}
}
self.init_buffer.clear();
},
}
Ok(())
}
}
#[cfg(test)]
use mockall::automock;
#[cfg(test)]
mod tests;
#[cfg(test)]
#[cfg_attr(coverage, coverage(off))]
impl<T: Clone + Send + Sync + kube::ResourceExt> ObjWatcher<T> {
pub(crate) fn new_from_parts(
handler: Box<dyn EventHandler<T> + Send>,
stream: ObjStream<T>,
clock: Box<dyn Clockable + Send>,
ready_tx: mpsc::Sender<bool>,
) -> ObjWatcher<T> {
ObjWatcher {
handler,
stream,
clock,
is_ready: true,
ready_tx,
init_buffer: vec![],
index: HashSet::new(),
}
}
}