docker_container_log_streamer/
lib.rs1use futures_util::{StreamExt, SinkExt };
2use futures_core::stream::Stream;
3use warp::filters::ws::{WebSocket, Message};
4use bollard::{ Docker, container::{LogsOptions, LogOutput} };
5use std::{
6 default::Default,
7 time::{ Duration, SystemTime},
8};
9use serde::{Deserialize, Serialize};
10use std::path::Path;
11use std::fs;
12use std::io::{BufWriter, Write, Error};
13use std::env::current_dir;
14pub mod tests;
15
16#[derive(Debug, Serialize, Deserialize)]
18pub struct WatchQueryParams {
19 pub container_name: String,
21 pub session_id: String,
23 pub timeout: u64,
25 pub stream_key: String,
27 #[serde(default)]
28 pub save_logs: bool,
30 #[serde(default)]
31 pub since_in_minutes: u64,
33}
34
35fn docker_since(mins: u64) -> i64 {
38 if mins == 0 {
39 return 0;
40 }
41
42 let mins_to_secs: u64 = mins * 60;
43 let d = Duration::from_secs(mins_to_secs);
44 match SystemTime::now().checked_sub(d) {
45 Some(t) => {
46 match t.duration_since(SystemTime::UNIX_EPOCH) {
47 Ok(d) => {
48 d.as_secs() as i64
49 }
50 Err(error) =>{
51 panic!("{:?}",error);
52 }
53 }
54 }
55 None => 0
56 }
57}
58
59async fn docker_logs(container_name: String, since_in_mins: u64) -> impl Stream<Item = Result<LogOutput, bollard::errors::Error>> {
61 let docker = Docker::connect_with_local_defaults().unwrap();
62 let since: i64 = docker_since(since_in_mins);
63 docker.logs(
64 container_name.as_str(),
65 Some(LogsOptions::<String> {
66 follow: true,
67 stdout: true,
68 stderr: true,
69 since:since ,
70 timestamps: true,
71 ..Default::default()
72 })
73 )
74}
75
76pub async fn send_message(socket: WebSocket, container_name: String, since_in_mins: u64, session_id: String, client_stream_key: String, config_stream_key: String, timeout: u64, save_logs: bool) {
79 let (mut tx, mut rx) = socket.split();
80
81 println!("Websocket Connected");
82 println!("Session ID: {}",session_id);
83 println!("Container Name: {}",container_name);
84 println!("Watch Timeout: {}s",timeout);
85 if client_stream_key != config_stream_key {
86 if let Err(error) = tx.send(Message::text("STREAM_KEY_INVALID")).await {
87 eprintln!("STREAM_KEY_INVALID:{:?}",error);
88 }
89 }
90 else {
91 let mut logs = docker_logs(container_name.clone(),since_in_mins).await;
92 let timeout: u64 = if timeout == 0 {
93 30
94 } else {
95 timeout
96 };
97 let file_name: String = format!("{}.log",&session_id);
98 let mut logger: Logger = Logger::new(&file_name);
99 while let Some(log_result) = logs.next().await {
100 match log_result {
101 Ok(log_output) => {
102 match log_output {
103 LogOutput::Console { message } =>{
104 if save_logs {
105 if let Err(error) = logger.write(&message) {
106 let error_message = format!("Unable to write logs to {}.log. Error: {}",&session_id, error);
107 eprint!("{}",error_message);
108 if let Err(error) = tx.send(Message::text(error_message)).await {
109 eprintln!("Unable to send error message to client.Error: {}",error);
110 }
111 break
112
113 }
114 }
115 let message = String::from_utf8_lossy(&message);
116 match tx.send(Message::text(message)).await {
117 Ok(_) => {
118 if let Err(error) = tokio::time::timeout(Duration::from_secs(timeout), rx.next()).await {
119 eprintln!("Nothing received from {} with container {}. Error: {:?}",session_id,container_name,error);
120 break
121 }
122 }
123 Err(error) => {
124 eprintln!("Unable to send message to {} watching container {}. Error: {:?}",session_id,container_name,error);
125 break
126 }
127 }
128 }
129 _ => continue
130 };
131 },
132 Err(error) => {
133 if let Err(error) = tx.send(Message::text(error.to_string())).await {
134 eprint!("Unable to send message to {}. Error: {}",session_id, error);
135 break
136 }
137 },
138 }
139 }
140 if let Err(error) = tx.send(Message::text("COMPLETED")).await {
141 eprintln!("Unable to send COMPLETED signal to {} for the closing of container {}. Error: {}",session_id,container_name,error)
142 }
143 }
144 match tx.reunite(rx).unwrap().close().await {
145 Ok(_) => {
146 println!("{} with container {} socket closed.",session_id,container_name);
147 }
148 Err(error) => {
149 eprintln!("Unable to close socket of session {} with container {}. Error: {:?}",session_id,container_name,error);
150 }
151 };
152}
153
154pub struct Logger {
156 pub file_handler: BufWriter<fs::File>
157}
158
159impl Logger {
160 pub fn log_dir() -> String {
162 let cdir: String = current_dir().expect("Unable to get current directory for logs.").to_string_lossy().to_string();
163 let log_file_path: String = format!("{}/logs/",cdir);
164 if let Err(error) = fs::create_dir_all(&log_file_path) {
165 panic!("Unable to create logs base directory. Error: {:?}",error);
166 }
167 log_file_path
168 }
169
170 pub fn new(file_name: &String) -> Self {
172 let log_file_path: String = format!("{}/{}",Logger::log_dir(),&file_name);
173 let path: &Path = Path::new(&log_file_path);
174 match fs::OpenOptions::new().create(true).append(true).open(path) {
175 Ok(file) => {
176 Self {
177 file_handler: BufWriter::new(file)
178 }
179 }
180 Err(error) => {
181 panic!("Unable to create file handler for {}. Error: {:?}",log_file_path,error);
182 }
183 }
184 }
185
186 pub fn write(&mut self, data: &[u8] ) -> Result<bool,Error> {
188 let f = &mut self.file_handler;
189 if let Err(error) = f.write(data) {
190 Err(error)
191 }
192 else{
193 match f.flush() {
194 Ok(_) => Ok(true),
195 Err(error) => Err(error)
196 }
197 }
198 }
199}