1use crate::{
2 client::Client,
3 input::key::Key,
4 k8s::Reflector,
5 ui::{state::Paging, StateRenderer},
6};
7use anyhow::anyhow;
8use futures::StreamExt;
9use k8s_openapi::serde::de::DeserializeOwned;
10use kube::runtime::reflector::Store;
11use log::log_enabled;
12use std::{
13 fmt::Debug,
14 future::Future,
15 hash::Hash,
16 pin::Pin,
17 sync::{Arc, Mutex},
18};
19use tokio::{
20 spawn,
21 sync::mpsc::{channel, Receiver, Sender},
22 task::JoinHandle,
23};
24use tui::{style::*, text::*, widgets::*};
25
26pub trait ListResource: Sized {
27 type Resource: kube::Resource
28 + Clone
29 + Default
30 + Debug
31 + Send
32 + Sync
33 + DeserializeOwned
34 + 'static;
35 type Message: Send + Sync + 'static;
36
37 fn render<SR: StateRenderer>(ctx: &Context<Self>, mut r: SR)
38 where
39 <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone,
40 {
41 let mut state = ctx.state.lock().unwrap();
42
43 match *state {
44 State::Loading => {
45 let table = Self::render_table(&mut []);
46 r.render(table);
47 }
48 State::List(ref items, ref mut state) => {
49 let mut items = items.state();
50 let table = { Self::render_table(&mut items) };
51 let empty = items.is_empty();
52
53 if state.selected().is_none() && !empty {
54 state.select(Some(0));
55 }
56
57 r.render_stateful(table, state);
58 }
59 State::Error(ref err) => {
60 let err = err.to_string();
61 let w = Paragraph::new(err)
62 .style(Style::default().bg(Color::Rgb(128, 0, 0)))
63 .block(
64 Block::default()
65 .title(Span::styled(
66 "Error",
67 Style::default().add_modifier(Modifier::BOLD),
68 ))
69 .borders(Borders::ALL),
70 );
71 r.render(w);
72 }
73 }
74 }
75
76 fn render_table<'r, 'a>(items: &'r mut [Arc<Self::Resource>]) -> Table<'a>
77 where
78 <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq;
79
80 #[allow(unused_variables)]
81 fn on_key(items: &[Arc<Self::Resource>], state: &TableState, key: Key) -> Option<Self::Message>
82 where
83 <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
84 {
85 None
86 }
87
88 fn process(client: Arc<Client>, msg: Self::Message)
89 -> Pin<Box<dyn Future<Output = ()> + Send>>;
90}
91
92struct Runner<R>
93where
94 R: ListResource,
95 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
96{
97 rx: Receiver<R::Message>,
98 client: Client,
99 ctx: Context<R>,
100}
101
102pub struct ListWatcher<R>
103where
104 R: ListResource,
105 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
106{
107 _runner: JoinHandle<()>,
108 ctx: Context<R>,
109}
110
111pub enum State<K>
112where
113 K: kube::Resource + 'static,
114 K::DynamicType: Hash + Eq,
115{
116 Loading,
117 List(Store<K>, TableState),
118 Error(anyhow::Error),
119}
120
121pub struct Context<R>
122where
123 R: ListResource,
124 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
125{
126 pub state: Arc<Mutex<State<R::Resource>>>,
127 tx: Sender<R::Message>,
128}
129
130impl<R> Clone for Context<R>
131where
132 R: ListResource,
133 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
134{
135 fn clone(&self) -> Self {
136 Self {
137 state: self.state.clone(),
138 tx: self.tx.clone(),
139 }
140 }
141}
142
143impl<R> ListWatcher<R>
144where
145 R: ListResource + 'static,
146 <<R as ListResource>::Resource as kube::Resource>::DynamicType:
147 Hash + Eq + Clone + Default + DeserializeOwned,
148{
149 pub fn new(client: Client) -> Self {
150 let (tx, rx) = channel::<R::Message>(10);
151
152 let ctx = Context {
153 tx,
154 state: Arc::new(Mutex::new(State::Loading)),
155 };
156
157 let runner = Runner {
158 rx,
159 client,
160 ctx: ctx.clone(),
161 };
162
163 let runner = spawn(async move {
164 runner.run().await;
165 });
166
167 Self {
168 _runner: runner,
169 ctx,
170 }
171 }
172
173 pub fn render<SR: StateRenderer>(&self, r: SR) {
174 R::render(&self.ctx, r);
175 }
176
177 pub async fn on_key(&self, key: Key) {
178 self.ctx.on_key(key).await;
179 }
180}
181
182impl<R: ListResource> Context<R>
183where
184 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone,
185{
186 pub async fn on_key(&self, key: Key) {
187 match &mut (*self.state.lock().unwrap()) {
188 State::List(items, state) => {
189 let items = items.state();
190 match key {
191 Key::Down => state.next(items.len()),
192 Key::Up => state.prev(items.len()),
193 _ => {
194 if let Some(msg) = R::on_key(items.as_slice(), state, key) {
195 let _ = self.tx.try_send(msg);
196 }
197 }
198 }
199 }
200 _ => {}
201 }
202 }
203}
204
205impl<R> Runner<R>
206where
207 R: ListResource,
208 <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone + Default,
209{
210 async fn run(mut self) {
211 let client = self.client.clone();
212 let ctx = self.ctx.clone();
213
214 let reflector = async {
215 let mut reflector: Option<Result<Reflector<R::Resource>, anyhow::Error>> = None;
216
217 'outer: loop {
218 match reflector {
219 None => {
220 *ctx.state.lock().unwrap() = State::Loading;
221 reflector = Some(Reflector::new(&client).await);
223 }
224 Some(Err(err)) => {
225 {
227 *ctx.state.lock().unwrap() = State::Error(anyhow!(err));
228 }
229 let r = Reflector::new(&client).await;
231 log::warn!("Created new reflector - ok: {}", r.is_ok());
232 reflector = Some(r);
233 }
234 Some(Ok(mut r)) => {
237 {
239 *ctx.state.lock().unwrap() =
240 State::List(r.reader.clone(), Default::default());
241 }
242 while let Some(evt) = r.stream.next().await {
244 if log_enabled!(log::Level::Info) {
245 let m = format!("{evt:?}");
246 log::info!("{}", &m[0..90]);
247 }
248 match evt {
249 Ok(_) => {}
250 Err(err) => {
251 log::warn!("Watch error: {err}");
252 reflector = Some(Err(anyhow!(err)));
253 continue 'outer;
254 }
255 }
256 }
257 log::warn!("Stream closed");
258 reflector = Some(Err(anyhow!("Stream closed")));
259 }
260 }
261 }
262 };
263
264 let receiver = async {
265 let client = Arc::new(client.clone());
266 while let Some(msg) = self.rx.recv().await {
267 R::process(client.clone(), msg).await;
268 }
269 };
270
271 futures::future::select(Box::pin(reflector), Box::pin(receiver)).await;
272 }
273}