jocker_lib/logs.rs
1use std::sync::Arc;
2
3use tokio::{
4 sync::mpsc::{self, Receiver, Sender},
5 task::JoinSet,
6};
7
8use crate::{
9 common::{Exec, Process, ProcessState},
10 error::{Error, InnerError, Result},
11};
12
13use crate::state::State;
14
15#[derive(Clone, Debug, Default, PartialEq)]
16pub struct LogsArgs {
17 pub follow: bool,
18 pub process_prefix: bool,
19 pub tail: bool,
20 pub processes: Vec<String>,
21}
22
23pub struct Logs {
24 args: LogsArgs,
25 state: Arc<State>,
26}
27
28impl Logs {
29 pub fn new(args: LogsArgs, state: Arc<State>) -> Self {
30 Logs { args, state }
31 }
32
33 pub async fn run(&self) -> Result<(JoinSet<Result<()>>, Receiver<String>)> {
34 let processes = self.state.filter_processes(&self.args.processes).await?;
35 let mut handles = JoinSet::new();
36 let max_process_name_len = processes.iter().fold(0, |acc, e| {
37 if acc < e.name().len() {
38 e.name().len()
39 } else {
40 acc
41 }
42 });
43 let (tx, rx) = mpsc::channel(processes.len() * 2);
44 for process in processes {
45 let state = self.state.clone();
46 handles.spawn(run(
47 state,
48 process,
49 self.args.clone(),
50 max_process_name_len,
51 tx.clone(),
52 ));
53 }
54
55 Ok((handles, rx))
56 }
57}
58
59impl Exec<()> for Logs {
60 async fn exec(&self) -> Result<()> {
61 let (mut handles, mut rx) = self.run().await.unwrap();
62
63 while let Some(message) = rx.recv().await {
64 println!("{message}");
65 }
66
67 while (handles.join_next().await).is_some() {}
68
69 Ok(())
70 }
71}
72
73async fn run(
74 state: Arc<State>,
75 process: Process,
76 args: LogsArgs,
77 max_process_name_len: usize,
78 log_tx: Sender<String>,
79) -> Result<()> {
80 let process_name = process.name();
81 // get file
82 // let path = state.filename_log_process(&process);
83
84 // get pos to end of file
85 // let f = File::open(&path).await?;
86 let process_prefix = if args.process_prefix {
87 format!("{process_name:max_process_name_len$} > ")
88 } else {
89 "".to_string()
90 };
91 if !args.tail {
92 // let reader = BufReader::new(f);
93 // let mut lines = reader.lines();
94 state
95 .scheduler()
96 .logs(
97 log_tx,
98 &process_prefix,
99 process.pid().ok_or_else(|| {
100 Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
101 "PID missing for log".to_owned(),
102 )))
103 })?,
104 None,
105 args.follow,
106 )
107 .await?;
108 // while let Ok(Some(line)) = lines.next_line().await {
109 // log_tx
110 // .send(format!("{process_prefix}{}", line))
111 // .await
112 // .unwrap();
113 // }
114 }
115
116 if !args.follow || process.state == ProcessState::Stopped {
117 return Ok(());
118 }
119
120 // set up watcher
121 // let mut f = File::open(&path).await?;
122 // let mut pos = f.metadata().await?.len();
123 // f.seek(SeekFrom::Start(pos)).await?;
124 // pos = f.metadata().await?.len();
125 // let (tx, rx) = std::sync::mpsc::channel();
126 // let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
127 // watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?;
128 //
129 // // watch
130 // for res in rx {
131 // match res {
132 // Ok(_event) => {
133 // // ignore any event that didn't change the pos
134 // if f.metadata().await?.len() == pos {
135 // continue;
136 // }
137 //
138 // // read from pos to end of file
139 // f.seek(std::io::SeekFrom::Start(pos)).await?;
140 //
141 // // update post to end of file
142 // pos = f.metadata().await?.len();
143 //
144 // let reader = BufReader::new(f.try_clone().await?);
145 // let mut lines = reader.lines();
146 // while let Ok(Some(line)) = lines.next_line().await {
147 // log_tx
148 // .send(format!("{process_prefix}{}", line,))
149 // .await
150 // .unwrap();
151 // }
152 // }
153 // Err(error) => println!("{error:?}"),
154 // }
155 // }
156
157 Ok(())
158}