docker_container_log_streamer/
lib.rs

1use futures_util::{StreamExt, SinkExt };
2use futures_core::stream::Stream;
3use warp::filters::ws::{WebSocket, Message};
4use bollard::{ Docker, container::{LogsOptions, LogOutput} };
5use std::{
6    default::Default, 
7    time::{ Duration, SystemTime},
8};
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::fs;
12use std::io::{BufWriter, Write, Error};
13use std::env::current_dir;
14pub mod tests;
15
16/// The query parameters required to execute log streaming.
17#[derive(Debug, Serialize, Deserialize)]
18pub struct WatchQueryParams {
19    /// The container name or id
20    pub container_name: String,
21    /// The session configured by the client
22    pub session_id: String,
23    /// How many seconds the websocket server will wait for the client to respond.
24    pub timeout: u64,
25    /// The key to use in order to access the resources.
26    pub stream_key: String,
27    #[serde(default)]
28    /// Set to true to save logs under /logs directory. Default: false
29    pub save_logs: bool,
30    #[serde(default)]
31    /// This will subract the number of minutes to the current system time. Then it will be used in the since parameter for docker logs.
32    pub since_in_minutes: u64,
33}
34
35/// Subtract the (N) minutes to the current system time.
36/// Use for docker logs "since" parameter.
37fn docker_since(mins: u64) -> i64 {
38    if mins == 0 {
39        return 0;
40    }
41    
42    let mins_to_secs: u64 = mins * 60;
43    let d = Duration::from_secs(mins_to_secs);
44    match SystemTime::now().checked_sub(d) {
45        Some(t) => {
46            match t.duration_since(SystemTime::UNIX_EPOCH) {
47                Ok(d) => {
48                    d.as_secs() as i64
49                }
50                Err(error) =>{
51                    panic!("{:?}",error);
52                }
53            }
54        }
55        None => 0
56    }
57}
58
59/// Establish a connection to docker then execute docker logs
60async fn docker_logs(container_name: String, since_in_mins: u64) ->  impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
61    let docker = Docker::connect_with_local_defaults().unwrap();
62    let since: i64 = docker_since(since_in_mins);
63    docker.logs(
64        container_name.as_str(),
65        Some(LogsOptions::<String> {
66            follow: true,
67            stdout: true,
68            stderr: true,
69            since:since ,
70            timestamps: true,
71            ..Default::default()
72        })
73    )
74}
75
76/// Send message logs or message to WebSocket Client.
77/// This will listen to the container logs via docker logs
78pub async fn send_message(socket: WebSocket, container_name: String, since_in_mins: u64, session_id: String, client_stream_key: String, config_stream_key: String, timeout: u64, save_logs: bool) { 
79    let (mut tx, mut rx) = socket.split();
80
81    println!("Websocket Connected");
82    println!("Session ID: {}",session_id);
83    println!("Container Name: {}",container_name);
84    println!("Watch Timeout: {}s",timeout);
85    if client_stream_key != config_stream_key {
86        if let Err(error) = tx.send(Message::text("STREAM_KEY_INVALID")).await {
87            eprintln!("STREAM_KEY_INVALID:{:?}",error);
88        }
89    }
90    else {
91        let mut logs = docker_logs(container_name.clone(),since_in_mins).await;
92        let timeout: u64 = if timeout == 0 {
93            30
94        } else {
95            timeout
96        };
97        let file_name: String = format!("{}.log",&session_id);
98        let mut logger: Logger = Logger::new(&file_name); 
99        while let Some(log_result) = logs.next().await { 
100            match log_result {
101                Ok(log_output) => { 
102                    match log_output {
103                        LogOutput::Console { message } =>{
104                            if save_logs {
105                                if let Err(error) = logger.write(&message) {                                                                                                    
106                                    let error_message = format!("Unable to write logs to {}.log. Error: {}",&session_id, error);
107                                    eprint!("{}",error_message);
108                                    if let Err(error) = tx.send(Message::text(error_message)).await {
109                                        eprintln!("Unable to send error message to client.Error: {}",error);
110                                    }
111                                    break
112
113                                }
114                            }
115                            let message = String::from_utf8_lossy(&message);
116                            match tx.send(Message::text(message)).await {
117                                Ok(_) => {
118                                    if let Err(error) = tokio::time::timeout(Duration::from_secs(timeout), rx.next()).await {                                    
119                                        eprintln!("Nothing received from {} with container {}. Error: {:?}",session_id,container_name,error);
120                                        break
121                                    }
122                                }                                
123                                Err(error) => {
124                                    eprintln!("Unable to send message to {} watching container {}. Error: {:?}",session_id,container_name,error);
125                                    break
126                                }
127                            }
128                        }
129                        _ => continue
130                    };
131                },
132                Err(error) => {                
133                    if let Err(error) = tx.send(Message::text(error.to_string())).await {
134                        eprint!("Unable to send message to {}. Error: {}",session_id, error);
135                        break
136                    }
137                }, 
138            }
139        }
140        if let Err(error) = tx.send(Message::text("COMPLETED")).await {
141            eprintln!("Unable to send COMPLETED signal to {} for the closing of container {}. Error: {}",session_id,container_name,error)
142        }
143    }
144    match tx.reunite(rx).unwrap().close().await {
145        Ok(_) => {
146            println!("{} with container {} socket closed.",session_id,container_name);
147        }
148        Err(error) => {
149            eprintln!("Unable to close socket of session {} with container {}. Error: {:?}",session_id,container_name,error);
150        }
151    };
152}
153
154/// Logger struct for handling the writing of logs.
155pub struct Logger {
156    pub file_handler: BufWriter<fs::File>
157}
158
159impl Logger {
160    /// This will automatically creates the log directory(if does not exists) then return the log directory path.
161    pub fn log_dir() -> String {
162        let cdir: String = current_dir().expect("Unable to get current directory for logs.").to_string_lossy().to_string();
163        let log_file_path: String = format!("{}/logs/",cdir); 
164        if let Err(error) = fs::create_dir_all(&log_file_path) {
165            panic!("Unable to create logs base directory. Error: {:?}",error);
166        }
167        log_file_path
168    }
169
170    /// Create a logger instance.
171    pub fn new(file_name: &String) -> Self {        
172        let log_file_path: String = format!("{}/{}",Logger::log_dir(),&file_name);
173        let path: &Path = Path::new(&log_file_path);
174        match fs::OpenOptions::new().create(true).append(true).open(path) {
175            Ok(file) => {
176                Self {
177                    file_handler: BufWriter::new(file)
178                }   
179            }
180            Err(error) => {
181                panic!("Unable to create file handler for {}. Error: {:?}",log_file_path,error);
182            }
183        }
184    }
185
186    /// Write the log contents in a file.
187    pub fn write(&mut self, data: &[u8] ) -> Result<bool,Error> {
188        let f = &mut self.file_handler;
189        if let Err(error) = f.write(data) {
190            Err(error)
191        }
192        else{
193            match f.flush() {
194                Ok(_) => Ok(true),
195                Err(error) => Err(error)
196            }
197        }
198    }
199}