poddy/app/state/
list.rs

1use crate::{
2    client::Client,
3    input::key::Key,
4    k8s::Reflector,
5    ui::{state::Paging, StateRenderer},
6};
7use anyhow::anyhow;
8use futures::StreamExt;
9use k8s_openapi::serde::de::DeserializeOwned;
10use kube::runtime::reflector::Store;
11use log::log_enabled;
12use std::{
13    fmt::Debug,
14    future::Future,
15    hash::Hash,
16    pin::Pin,
17    sync::{Arc, Mutex},
18};
19use tokio::{
20    spawn,
21    sync::mpsc::{channel, Receiver, Sender},
22    task::JoinHandle,
23};
24use tui::{style::*, text::*, widgets::*};
25
26pub trait ListResource: Sized {
27    type Resource: kube::Resource
28        + Clone
29        + Default
30        + Debug
31        + Send
32        + Sync
33        + DeserializeOwned
34        + 'static;
35    type Message: Send + Sync + 'static;
36
37    fn render<SR: StateRenderer>(ctx: &Context<Self>, mut r: SR)
38    where
39        <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone,
40    {
41        let mut state = ctx.state.lock().unwrap();
42
43        match *state {
44            State::Loading => {
45                let table = Self::render_table(&mut []);
46                r.render(table);
47            }
48            State::List(ref items, ref mut state) => {
49                let mut items = items.state();
50                let table = { Self::render_table(&mut items) };
51                let empty = items.is_empty();
52
53                if state.selected().is_none() && !empty {
54                    state.select(Some(0));
55                }
56
57                r.render_stateful(table, state);
58            }
59            State::Error(ref err) => {
60                let err = err.to_string();
61                let w = Paragraph::new(err)
62                    .style(Style::default().bg(Color::Rgb(128, 0, 0)))
63                    .block(
64                        Block::default()
65                            .title(Span::styled(
66                                "Error",
67                                Style::default().add_modifier(Modifier::BOLD),
68                            ))
69                            .borders(Borders::ALL),
70                    );
71                r.render(w);
72            }
73        }
74    }
75
76    fn render_table<'r, 'a>(items: &'r mut [Arc<Self::Resource>]) -> Table<'a>
77    where
78        <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq;
79
80    #[allow(unused_variables)]
81    fn on_key(items: &[Arc<Self::Resource>], state: &TableState, key: Key) -> Option<Self::Message>
82    where
83        <<Self as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
84    {
85        None
86    }
87
88    fn process(client: Arc<Client>, msg: Self::Message)
89        -> Pin<Box<dyn Future<Output = ()> + Send>>;
90}
91
92struct Runner<R>
93where
94    R: ListResource,
95    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
96{
97    rx: Receiver<R::Message>,
98    client: Client,
99    ctx: Context<R>,
100}
101
102pub struct ListWatcher<R>
103where
104    R: ListResource,
105    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
106{
107    _runner: JoinHandle<()>,
108    ctx: Context<R>,
109}
110
111pub enum State<K>
112where
113    K: kube::Resource + 'static,
114    K::DynamicType: Hash + Eq,
115{
116    Loading,
117    List(Store<K>, TableState),
118    Error(anyhow::Error),
119}
120
121pub struct Context<R>
122where
123    R: ListResource,
124    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
125{
126    pub state: Arc<Mutex<State<R::Resource>>>,
127    tx: Sender<R::Message>,
128}
129
130impl<R> Clone for Context<R>
131where
132    R: ListResource,
133    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq,
134{
135    fn clone(&self) -> Self {
136        Self {
137            state: self.state.clone(),
138            tx: self.tx.clone(),
139        }
140    }
141}
142
143impl<R> ListWatcher<R>
144where
145    R: ListResource + 'static,
146    <<R as ListResource>::Resource as kube::Resource>::DynamicType:
147        Hash + Eq + Clone + Default + DeserializeOwned,
148{
149    pub fn new(client: Client) -> Self {
150        let (tx, rx) = channel::<R::Message>(10);
151
152        let ctx = Context {
153            tx,
154            state: Arc::new(Mutex::new(State::Loading)),
155        };
156
157        let runner = Runner {
158            rx,
159            client,
160            ctx: ctx.clone(),
161        };
162
163        let runner = spawn(async move {
164            runner.run().await;
165        });
166
167        Self {
168            _runner: runner,
169            ctx,
170        }
171    }
172
173    pub fn render<SR: StateRenderer>(&self, r: SR) {
174        R::render(&self.ctx, r);
175    }
176
177    pub async fn on_key(&self, key: Key) {
178        self.ctx.on_key(key).await;
179    }
180}
181
182impl<R: ListResource> Context<R>
183where
184    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone,
185{
186    pub async fn on_key(&self, key: Key) {
187        match &mut (*self.state.lock().unwrap()) {
188            State::List(items, state) => {
189                let items = items.state();
190                match key {
191                    Key::Down => state.next(items.len()),
192                    Key::Up => state.prev(items.len()),
193                    _ => {
194                        if let Some(msg) = R::on_key(items.as_slice(), state, key) {
195                            let _ = self.tx.try_send(msg);
196                        }
197                    }
198                }
199            }
200            _ => {}
201        }
202    }
203}
204
205impl<R> Runner<R>
206where
207    R: ListResource,
208    <<R as ListResource>::Resource as kube::Resource>::DynamicType: Hash + Eq + Clone + Default,
209{
210    async fn run(mut self) {
211        let client = self.client.clone();
212        let ctx = self.ctx.clone();
213
214        let reflector = async {
215            let mut reflector: Option<Result<Reflector<R::Resource>, anyhow::Error>> = None;
216
217            'outer: loop {
218                match reflector {
219                    None => {
220                        *ctx.state.lock().unwrap() = State::Loading;
221                        // Create
222                        reflector = Some(Reflector::new(&client).await);
223                    }
224                    Some(Err(err)) => {
225                        // set error
226                        {
227                            *ctx.state.lock().unwrap() = State::Error(anyhow!(err));
228                        }
229                        // create
230                        let r = Reflector::new(&client).await;
231                        log::warn!("Created new reflector - ok: {}", r.is_ok());
232                        reflector = Some(r);
233                    }
234                    // FIXME: need to deal with the case that we could create a watcher, but it
235                    // right away fails. Which results in a red blinking display.
236                    Some(Ok(mut r)) => {
237                        // set store
238                        {
239                            *ctx.state.lock().unwrap() =
240                                State::List(r.reader.clone(), Default::default());
241                        }
242                        // run
243                        while let Some(evt) = r.stream.next().await {
244                            if log_enabled!(log::Level::Info) {
245                                let m = format!("{evt:?}");
246                                log::info!("{}", &m[0..90]);
247                            }
248                            match evt {
249                                Ok(_) => {}
250                                Err(err) => {
251                                    log::warn!("Watch error: {err}");
252                                    reflector = Some(Err(anyhow!(err)));
253                                    continue 'outer;
254                                }
255                            }
256                        }
257                        log::warn!("Stream closed");
258                        reflector = Some(Err(anyhow!("Stream closed")));
259                    }
260                }
261            }
262        };
263
264        let receiver = async {
265            let client = Arc::new(client.clone());
266            while let Some(msg) = self.rx.recv().await {
267                R::process(client.clone(), msg).await;
268            }
269        };
270
271        futures::future::select(Box::pin(reflector), Box::pin(receiver)).await;
272    }
273}