libksre/app/
core.rs

1use std::ops::Sub;
2use std::rc::Rc;
3
4use chrono::{DateTime, Utc};
5use color_eyre::eyre::Result;
6use k8s_openapi::api::core::v1::{Namespace, PodSpec, PodStatus};
7use kube::{api::ListParams, Api, Client as KubeClient, ResourceExt};
8use tokio::{
9    sync::{broadcast, mpsc},
10    task::JoinHandle,
11};
12use tokio_util::sync::CancellationToken;
13
14use crate::event::KubeEvent;
15use crate::kubernetes::PodMetricsApi;
16use crate::tui::Tui;
17
18use super::action::Route;
19use super::job::tail_logs;
20use super::keybind::{KeyContext, DEFAULT_ERROR_HANDLE};
21use super::ui::home::ui_main;
22
23use crate::app::job::{pod_exec, PodExecArgs};
24use crate::app::AppState;
25
26pub struct App {
27    tui: Tui,
28    kube_client: KubeClient,
29    pod_event_rx: broadcast::Receiver<KubeEvent<PodSpec, PodStatus>>,
30    should_quit: bool,
31    app_state: AppState,
32    task0: JoinHandle<()>,
33    task1: JoinHandle<()>,
34    ready: bool,
35
36    cmd_input_writer: Option<mpsc::Sender<String>>,
37
38    // handler
39    // used for fetch pod metrics from kube-metrics-apiserver
40    ticker: Ticker,
41    pod_metrics_api: PodMetricsApi,
42}
43
44impl App {
45    pub fn new(
46        tui: Tui,
47        kube_event: broadcast::Receiver<KubeEvent<PodSpec, PodStatus>>,
48        kube_client: KubeClient,
49    ) -> Self {
50        let pod_metrics_api = PodMetricsApi::new(kube_client.clone());
51        Self {
52            tui,
53            pod_event_rx: kube_event,
54            app_state: AppState::new(),
55            kube_client,
56            should_quit: false,
57            task0: tokio::spawn(async {}),
58            task1: tokio::spawn(async {}),
59            ready: true,
60            cmd_input_writer: None,
61            ticker: Ticker::new(),
62            pod_metrics_api,
63        }
64    }
65
66    pub async fn run(&mut self) -> Result<()> {
67        let namespace: Api<Namespace> = Api::all(self.kube_client.clone());
68
69        for ns in namespace.list(&ListParams::default()).await.unwrap() {
70            self.app_state
71                .namespace_items
72                .items
73                .push(Rc::from(ns.name_any().as_str()));
74        }
75
76        let mut keyctx: KeyContext = DEFAULT_ERROR_HANDLE;
77        loop {
78            tokio::select! {
79                tui_event = self.tui.next()=> {
80                    if let Some(event) = tui_event {
81                        keyctx = self.app_state.handle_terminal_key_event(event);
82                    }
83                },
84                kube_event = self.pod_event_rx.recv() => {
85                    if let Ok(event) = kube_event{
86                        keyctx = self.app_state.handle_pod_reflect_event(event);
87                    }
88                },
89            }
90
91            // 优先判断是否有cmdcontext , 存在command contxt 意味着当前正在处理command 模式
92            if let Some(cmd_context) = self.app_state.consume_command_task() {
93                if let Some(handler) = cmd_context.run_fn {
94                    handler(self, Some(cmd_context.stop_fn.clone()));
95                }
96            } else if let Some(handler) = keyctx.handler {
97                handler(self, None);
98                self.ready = true;
99            }
100            if self.ticker.next() && self.app_state.show_handle_pod_metrics() {
101                if let Some((namespace, pod_name)) = self.app_state.pod_name() {
102                    let metric = self.pod_metrics_api.get(namespace.as_ref(), pod_name.as_ref()).await.unwrap();
103                    self.app_state.handle_pod_metrics(metric);
104                }
105                /* for pod_metric in self.pod_metrics_api.list().await {
106                    self.app_state.handle_pod_metrics(pod_metric);
107                } */
108            }
109
110
111            self.draw_ui().await;
112
113            if self.should_quit {
114                break;
115            }
116        }
117
118        Ok(())
119    }
120
121    async fn draw_ui(&mut self) {
122        let stdout_buffer = self.app_state.stdout_buffer.clone();
123        let reader = stdout_buffer.read().await;
124        self.tui
125            .draw(|f| ui_main(f, &mut self.app_state, reader))
126            .unwrap();
127    }
128}
129
130// all handler
131impl App {
132    pub fn handle_next_route(&mut self, _cancel: Option<CancellationToken>) {
133        self.app_state.next_route();
134    }
135
136    pub fn fake_handlefunction(&mut self, _cancel: Option<CancellationToken>) {}
137
138    pub fn handle_quit(&mut self, _cancel: Option<CancellationToken>) {
139        self.should_quit = true;
140    }
141
142    pub fn select_items_next(&mut self, _cancel: Option<CancellationToken>) {
143        if let Route::PodNamespace = self.app_state.cur_route {
144            self.app_state.namespace_items.next();
145            return;
146        }
147
148        self.app_state.cache_items.next();
149    }
150    pub fn select_items_prev(&mut self, _cancel: Option<CancellationToken>) {
151        if let Route::PodNamespace = self.app_state.cur_route {
152            self.app_state.namespace_items.prev();
153            return;
154        }
155        self.app_state.cache_items.prev();
156    }
157
158    pub fn handle_pod_logs(&mut self, cancel: Option<CancellationToken>) {
159        if self.ready {
160            if !self.task0.is_finished() {
161                self.task0.abort();
162            }
163            if !self.task1.is_finished() {
164                self.task1.abort();
165            }
166            let namespace_pod = self.app_state.get_namespaced_pod();
167            if namespace_pod.is_none() {
168                self.app_state.cancel_executor();
169                return;
170            }
171            self.ready = false;
172            let (namespace, pod) = namespace_pod.unwrap();
173
174            let cancel = cancel.unwrap();
175            let (log_writer_tx, mut log_reader_rx): (mpsc::Sender<String>, mpsc::Receiver<String>) =
176                mpsc::channel(10);
177            self.task0 = tokio::spawn(tail_logs(
178                cancel,
179                self.kube_client.clone(),
180                log_writer_tx,
181                pod.to_string(),
182                namespace.to_string(),
183            ));
184            let writer = self.app_state.stdout_buffer_writer();
185            self.task1 = tokio::spawn(async move {
186                {
187                    writer.write().await.select_all();
188                    writer.write().await.cut();
189                }
190                while let Some(line) = log_reader_rx.recv().await {
191                    writer.write().await.insert_str(line.as_str());
192                    writer.write().await.insert_newline();
193                }
194            })
195        }
196    }
197
198    pub fn handle_pod_exec(&mut self, cancel: Option<CancellationToken>) {
199        if self.ready {
200            self.ready = false;
201            let cancel = cancel.unwrap();
202            let (input_writer, input_reader): (mpsc::Sender<String>, mpsc::Receiver<String>) =
203                mpsc::channel(10);
204            self.cmd_input_writer = Some(input_writer);
205            let pod_args = PodExecArgs {
206                kube_client: self.kube_client.clone(),
207                pod_name: "default:nginx".to_string(),
208                container: None,
209            };
210            let writer = self.app_state.stdout_buffer_writer();
211            self.task1 = tokio::spawn(pod_exec(cancel, writer, input_reader, pod_args));
212        } else {
213            let writer = self.cmd_input_writer.as_ref().unwrap().clone();
214            let input = self.app_state.user_input.clone();
215            tokio::spawn(async move {
216                writer.send(input).await.expect("send failed");
217            });
218            self.app_state.user_input.clear();
219        }
220    }
221}
222
223// app tempoary task relative
224impl App {}
225
226impl Drop for App {
227    fn drop(&mut self) {
228        self.should_quit = true;
229    }
230}
231
232impl Drop for AppState {
233    fn drop(&mut self) {}
234}
235
236struct Ticker {
237    update_at: DateTime<Utc>,
238    curr_time: DateTime<Utc>,
239}
240
241// Name[#TODO] (should add some comments)
242impl Ticker {
243    fn new() -> Self {
244        Self {
245            update_at: chrono::Utc::now(),
246            curr_time: chrono::Utc::now(),
247        }
248    }
249
250    fn next(&mut self) -> bool {
251        let current = self.curr_time;
252        self.curr_time = chrono::Utc::now();
253
254        if current.sub(self.update_at).num_seconds() > 1 {
255            self.update_at = chrono::Utc::now();
256            return true;
257        }
258        false
259    }
260}