1use std::ops::Sub;
2use std::rc::Rc;
3
4use chrono::{DateTime, Utc};
5use color_eyre::eyre::Result;
6use k8s_openapi::api::core::v1::{Namespace, PodSpec, PodStatus};
7use kube::{api::ListParams, Api, Client as KubeClient, ResourceExt};
8use tokio::{
9 sync::{broadcast, mpsc},
10 task::JoinHandle,
11};
12use tokio_util::sync::CancellationToken;
13
14use crate::event::KubeEvent;
15use crate::kubernetes::PodMetricsApi;
16use crate::tui::Tui;
17
18use super::action::Route;
19use super::job::tail_logs;
20use super::keybind::{KeyContext, DEFAULT_ERROR_HANDLE};
21use super::ui::home::ui_main;
22
23use crate::app::job::{pod_exec, PodExecArgs};
24use crate::app::AppState;
25
26pub struct App {
27 tui: Tui,
28 kube_client: KubeClient,
29 pod_event_rx: broadcast::Receiver<KubeEvent<PodSpec, PodStatus>>,
30 should_quit: bool,
31 app_state: AppState,
32 task0: JoinHandle<()>,
33 task1: JoinHandle<()>,
34 ready: bool,
35
36 cmd_input_writer: Option<mpsc::Sender<String>>,
37
38 ticker: Ticker,
41 pod_metrics_api: PodMetricsApi,
42}
43
44impl App {
45 pub fn new(
46 tui: Tui,
47 kube_event: broadcast::Receiver<KubeEvent<PodSpec, PodStatus>>,
48 kube_client: KubeClient,
49 ) -> Self {
50 let pod_metrics_api = PodMetricsApi::new(kube_client.clone());
51 Self {
52 tui,
53 pod_event_rx: kube_event,
54 app_state: AppState::new(),
55 kube_client,
56 should_quit: false,
57 task0: tokio::spawn(async {}),
58 task1: tokio::spawn(async {}),
59 ready: true,
60 cmd_input_writer: None,
61 ticker: Ticker::new(),
62 pod_metrics_api,
63 }
64 }
65
66 pub async fn run(&mut self) -> Result<()> {
67 let namespace: Api<Namespace> = Api::all(self.kube_client.clone());
68
69 for ns in namespace.list(&ListParams::default()).await.unwrap() {
70 self.app_state
71 .namespace_items
72 .items
73 .push(Rc::from(ns.name_any().as_str()));
74 }
75
76 let mut keyctx: KeyContext = DEFAULT_ERROR_HANDLE;
77 loop {
78 tokio::select! {
79 tui_event = self.tui.next()=> {
80 if let Some(event) = tui_event {
81 keyctx = self.app_state.handle_terminal_key_event(event);
82 }
83 },
84 kube_event = self.pod_event_rx.recv() => {
85 if let Ok(event) = kube_event{
86 keyctx = self.app_state.handle_pod_reflect_event(event);
87 }
88 },
89 }
90
91 if let Some(cmd_context) = self.app_state.consume_command_task() {
93 if let Some(handler) = cmd_context.run_fn {
94 handler(self, Some(cmd_context.stop_fn.clone()));
95 }
96 } else if let Some(handler) = keyctx.handler {
97 handler(self, None);
98 self.ready = true;
99 }
100 if self.ticker.next() && self.app_state.show_handle_pod_metrics() {
101 if let Some((namespace, pod_name)) = self.app_state.pod_name() {
102 let metric = self.pod_metrics_api.get(namespace.as_ref(), pod_name.as_ref()).await.unwrap();
103 self.app_state.handle_pod_metrics(metric);
104 }
105 }
109
110
111 self.draw_ui().await;
112
113 if self.should_quit {
114 break;
115 }
116 }
117
118 Ok(())
119 }
120
121 async fn draw_ui(&mut self) {
122 let stdout_buffer = self.app_state.stdout_buffer.clone();
123 let reader = stdout_buffer.read().await;
124 self.tui
125 .draw(|f| ui_main(f, &mut self.app_state, reader))
126 .unwrap();
127 }
128}
129
130impl App {
132 pub fn handle_next_route(&mut self, _cancel: Option<CancellationToken>) {
133 self.app_state.next_route();
134 }
135
136 pub fn fake_handlefunction(&mut self, _cancel: Option<CancellationToken>) {}
137
138 pub fn handle_quit(&mut self, _cancel: Option<CancellationToken>) {
139 self.should_quit = true;
140 }
141
142 pub fn select_items_next(&mut self, _cancel: Option<CancellationToken>) {
143 if let Route::PodNamespace = self.app_state.cur_route {
144 self.app_state.namespace_items.next();
145 return;
146 }
147
148 self.app_state.cache_items.next();
149 }
150 pub fn select_items_prev(&mut self, _cancel: Option<CancellationToken>) {
151 if let Route::PodNamespace = self.app_state.cur_route {
152 self.app_state.namespace_items.prev();
153 return;
154 }
155 self.app_state.cache_items.prev();
156 }
157
158 pub fn handle_pod_logs(&mut self, cancel: Option<CancellationToken>) {
159 if self.ready {
160 if !self.task0.is_finished() {
161 self.task0.abort();
162 }
163 if !self.task1.is_finished() {
164 self.task1.abort();
165 }
166 let namespace_pod = self.app_state.get_namespaced_pod();
167 if namespace_pod.is_none() {
168 self.app_state.cancel_executor();
169 return;
170 }
171 self.ready = false;
172 let (namespace, pod) = namespace_pod.unwrap();
173
174 let cancel = cancel.unwrap();
175 let (log_writer_tx, mut log_reader_rx): (mpsc::Sender<String>, mpsc::Receiver<String>) =
176 mpsc::channel(10);
177 self.task0 = tokio::spawn(tail_logs(
178 cancel,
179 self.kube_client.clone(),
180 log_writer_tx,
181 pod.to_string(),
182 namespace.to_string(),
183 ));
184 let writer = self.app_state.stdout_buffer_writer();
185 self.task1 = tokio::spawn(async move {
186 {
187 writer.write().await.select_all();
188 writer.write().await.cut();
189 }
190 while let Some(line) = log_reader_rx.recv().await {
191 writer.write().await.insert_str(line.as_str());
192 writer.write().await.insert_newline();
193 }
194 })
195 }
196 }
197
198 pub fn handle_pod_exec(&mut self, cancel: Option<CancellationToken>) {
199 if self.ready {
200 self.ready = false;
201 let cancel = cancel.unwrap();
202 let (input_writer, input_reader): (mpsc::Sender<String>, mpsc::Receiver<String>) =
203 mpsc::channel(10);
204 self.cmd_input_writer = Some(input_writer);
205 let pod_args = PodExecArgs {
206 kube_client: self.kube_client.clone(),
207 pod_name: "default:nginx".to_string(),
208 container: None,
209 };
210 let writer = self.app_state.stdout_buffer_writer();
211 self.task1 = tokio::spawn(pod_exec(cancel, writer, input_reader, pod_args));
212 } else {
213 let writer = self.cmd_input_writer.as_ref().unwrap().clone();
214 let input = self.app_state.user_input.clone();
215 tokio::spawn(async move {
216 writer.send(input).await.expect("send failed");
217 });
218 self.app_state.user_input.clear();
219 }
220 }
221}
222
223impl App {}
225
226impl Drop for App {
227 fn drop(&mut self) {
228 self.should_quit = true;
229 }
230}
231
232impl Drop for AppState {
233 fn drop(&mut self) {}
234}
235
236struct Ticker {
237 update_at: DateTime<Utc>,
238 curr_time: DateTime<Utc>,
239}
240
241impl Ticker {
243 fn new() -> Self {
244 Self {
245 update_at: chrono::Utc::now(),
246 curr_time: chrono::Utc::now(),
247 }
248 }
249
250 fn next(&mut self) -> bool {
251 let current = self.curr_time;
252 self.curr_time = chrono::Utc::now();
253
254 if current.sub(self.update_at).num_seconds() > 1 {
255 self.update_at = chrono::Utc::now();
256 return true;
257 }
258 false
259 }
260}