jocker_lib/
logs.rs

1use std::sync::Arc;
2
3use tokio::{
4    sync::mpsc::{self, Receiver, Sender},
5    task::JoinSet,
6};
7
8use crate::{
9    common::{Exec, Process, ProcessState},
10    error::{Error, InnerError, Result},
11};
12
13use crate::state::State;
14
15#[derive(Clone, Debug, Default, PartialEq)]
16pub struct LogsArgs {
17    pub follow: bool,
18    pub process_prefix: bool,
19    pub tail: bool,
20    pub processes: Vec<String>,
21}
22
23pub struct Logs {
24    args: LogsArgs,
25    state: Arc<State>,
26}
27
28impl Logs {
29    pub fn new(args: LogsArgs, state: Arc<State>) -> Self {
30        Logs { args, state }
31    }
32
33    pub async fn run(&self) -> Result<(JoinSet<Result<()>>, Receiver<String>)> {
34        let processes = self.state.filter_processes(&self.args.processes).await?;
35        let mut handles = JoinSet::new();
36        let max_process_name_len = processes.iter().fold(0, |acc, e| {
37            if acc < e.name().len() {
38                e.name().len()
39            } else {
40                acc
41            }
42        });
43        let (tx, rx) = mpsc::channel(processes.len() * 2);
44        for process in processes {
45            let state = self.state.clone();
46            handles.spawn(run(
47                state,
48                process,
49                self.args.clone(),
50                max_process_name_len,
51                tx.clone(),
52            ));
53        }
54
55        Ok((handles, rx))
56    }
57}
58
59impl Exec<()> for Logs {
60    async fn exec(&self) -> Result<()> {
61        let (mut handles, mut rx) = self.run().await.unwrap();
62
63        while let Some(message) = rx.recv().await {
64            println!("{message}");
65        }
66
67        while (handles.join_next().await).is_some() {}
68
69        Ok(())
70    }
71}
72
73async fn run(
74    state: Arc<State>,
75    process: Process,
76    args: LogsArgs,
77    max_process_name_len: usize,
78    log_tx: Sender<String>,
79) -> Result<()> {
80    let process_name = process.name();
81    // get file
82    // let path = state.filename_log_process(&process);
83
84    // get pos to end of file
85    // let f = File::open(&path).await?;
86    let process_prefix = if args.process_prefix {
87        format!("{process_name:max_process_name_len$} > ")
88    } else {
89        "".to_string()
90    };
91    if !args.tail {
92        // let reader = BufReader::new(f);
93        // let mut lines = reader.lines();
94        state
95            .scheduler()
96            .logs(
97                log_tx,
98                &process_prefix,
99                process.pid().ok_or_else(|| {
100                    Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
101                        "PID missing for log".to_owned(),
102                    )))
103                })?,
104                None,
105                args.follow,
106            )
107            .await?;
108        // while let Ok(Some(line)) = lines.next_line().await {
109        //     log_tx
110        //         .send(format!("{process_prefix}{}", line))
111        //         .await
112        //         .unwrap();
113        // }
114    }
115
116    if !args.follow || process.state == ProcessState::Stopped {
117        return Ok(());
118    }
119
120    // set up watcher
121    // let mut f = File::open(&path).await?;
122    // let mut pos = f.metadata().await?.len();
123    // f.seek(SeekFrom::Start(pos)).await?;
124    // pos = f.metadata().await?.len();
125    // let (tx, rx) = std::sync::mpsc::channel();
126    // let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
127    // watcher.watch(path.as_ref(), RecursiveMode::NonRecursive)?;
128    //
129    // // watch
130    // for res in rx {
131    //     match res {
132    //         Ok(_event) => {
133    //             // ignore any event that didn't change the pos
134    //             if f.metadata().await?.len() == pos {
135    //                 continue;
136    //             }
137    //
138    //             // read from pos to end of file
139    //             f.seek(std::io::SeekFrom::Start(pos)).await?;
140    //
141    //             // update post to end of file
142    //             pos = f.metadata().await?.len();
143    //
144    //             let reader = BufReader::new(f.try_clone().await?);
145    //             let mut lines = reader.lines();
146    //             while let Ok(Some(line)) = lines.next_line().await {
147    //                 log_tx
148    //                     .send(format!("{process_prefix}{}", line,))
149    //                     .await
150    //                     .unwrap();
151    //             }
152    //         }
153    //         Err(error) => println!("{error:?}"),
154    //     }
155    // }
156
157    Ok(())
158}