process_events_streaming/
lib.rs

1use duct::{cmd, Expression, ReaderHandle};
2use std::ffi::OsString;
3use std::fmt::Error;
4use std::io::{BufRead, BufReader};
5
6use std::sync::Arc;
7use std::thread::JoinHandle;
8use std::{io, thread};
9
10/// Various events associated with process's life-cycle
11///
12#[derive(Debug)]
13pub enum ProcessEvent {
14    /// Default value placeholder
15    _Unknown,
16    /// Process is starting but not yet started!
17    Starting,
18    /// Process is started
19    Started,
20    /// Error occurred while starting the process itself
21    StartError,
22    /// Process started but error occurred during reading the output data
23    IOError,
24    /// Process started and output data reader reached to the EOF, means process's output data is unavailable
25    IOEof,
26    /// Process started and a line from the output data is available now
27    IOData,
28    /// Process started and during IOData reading based on the API consumer's decision the callback returned [`Some(false)`] ,
29    /// which means process's exit request is submitted
30    ExitRequested,
31    /// Kill API was used to kill the process
32    KillRequested,
33    /// Process which was started earlier now exited
34    Exited,
35    /// A error occurred while killing/stopping the process
36    KillError,
37}
38
39/// Various fields related to the process
40///
41pub struct ProcessData<'a> {
42    /// Request data
43    pub request: Option<Arc<ProcessRequest>>,
44    /// Line number from output of the Process's STDOUT & STDERR
45    pub line_number: i64,
46    /// A single line data from output of the Process's STDOUT & STDERR
47    pub line: String,
48    /// Internal reader handle for managing the process
49    reader: Option<&'a ReaderHandle>,
50}
51
52impl ProcessData<'_> {
53    /// Create a new instance of the [`ProcessData`]
54    ///
55    pub fn new() -> Self {
56        Self {
57            request: None,
58            line_number: 0,
59            line: String::new(),
60            reader: None,
61        }
62    }
63    /// Kill the running process
64    pub fn kill(&self) -> io::Result<()> {
65        Ok(if self.reader.is_some() {
66            check_and_trigger_callback(
67                &self.request.as_ref().unwrap(),
68                &ProcessEvent::KillRequested,
69                &self,
70            );
71            return self.reader.as_ref().unwrap().kill();
72        })
73    }
74
75    /// Get the list of child pids
76    pub fn child_pids(&self) -> Vec<u32> {
77        if self.reader.is_some() {
78            return self.reader.as_ref().unwrap().pids();
79        }
80        return vec![];
81    }
82}
83
84/// Resulted data received from the process execution
85#[derive(Debug)]
86pub struct ProcessResult {
87    /// In case of non-blocking mode use this to join and wait for the process to complete
88    pub join_handle: Option<io::Result<JoinHandle<ProcessResult>>>,
89    /// Should exit or not the process based on the custom conditions
90    pub should_exit: Option<bool>,
91    /// Process execution was successful or not for the desired outcome
92    pub success: Result<bool, std::io::Error>,
93    /// Date as String vector
94    pub data_vec_str: Option<Vec<String>>,
95    /// Date as true/false
96    pub data_bool: Option<bool>,
97    /// Date as numeric value i128
98    pub data_num: Option<i128>,
99    /// Date as f64 value
100    pub data_decimal: Option<f64>,
101}
102
103impl ProcessResult {
104    pub fn new() -> Self {
105        Self {
106            join_handle: None,
107            should_exit: None,
108            success: Ok(false),
109            data_vec_str: None,
110            data_bool: None,
111            data_num: None,
112            data_decimal: None,
113        }
114    }
115
116    /// set join handle
117    fn set_join_handle(&mut self, join_handle: Option<io::Result<JoinHandle<ProcessResult>>>) {
118        self.join_handle = join_handle;
119    }
120
121    ///set exit and success data
122    fn set_exit_flag_and_success(
123        &mut self,
124        should_exit: bool,
125        success: Result<bool, std::io::Error>,
126    ) {
127        self.should_exit = Some(should_exit);
128        self.success = success;
129    }
130}
131
132unsafe impl Sync for ProcessRequest {}
133unsafe impl Send for ProcessRequest {}
134
135/// A request structure to start a process
136pub struct ProcessRequest {
137    /// Custom unique numeric id to relate the various callbacks for a particular process execution session
138    pub request_id: u32,
139    /// Use shell mode or direct executable path based execution
140    pub use_shell: bool,
141    /// Use blocking or non blocking mode using internal threads
142    pub non_blocking_mode: bool,
143    /// (2D Array) Vector of command line along with arguments. For a single command line one vector element is enough. For the pipe line use case where output of one command to provide to the next command, use Vector of command lines.
144    pub cmd_line: Vec<Vec<String>>,
145    /// Register callback to get various events and process output, for no callbacks use None
146    pub callback: Option<Arc<dyn Fn(&ProcessEvent, &ProcessData) -> ProcessResult + 'static>>,
147}
148
149impl ProcessRequest {
150    /**
151     Run a process based on the provided process request which is events based multi process execution(blocking & non-blocking modes) in parallel and with data streaming
152     Generates various events [`ProcessEvent`] according to the process's life-cycle, process's information and data [`ProcessData`] associated with that event
153     # Arguments
154     ProcessRequest : [`ProcessRequest`] // A request structure to start a process
155     # Return
156     ProcessResult : [`ProcessResult`] // Contains join handle for non-blocking and data variables
157     For Blocking mode use join_handle [`Option<io::Result<JoinHandle<ProcessResult>>>`], so the caller can join & wait for the process completion if needed!
158     In the callback set the custom data to be retrieved once process execution is over, which will be returned in response of the join call.
159     # Examples
160     ```
161    // Setup callback for the process events and data streaming
162    //
163    // use process_events_streaming::{ProcessRequest, ProcessResult, ProcessData, ProcessEvent};
164    // use std::{thread, time::Duration};
165    //      let callback = |status: &ProcessEvent, data: &ProcessData| -> ProcessResult {
166    //          match status {
167    //              ProcessEvent::Started => {
168    //                  println!(
169    //                      "Event {:?} | req-id {}  | Pids: {:?}",
170    //                      status,
171    //                      data.request.as_ref().unwrap().request_id,
172    //                      data.child_pids()
173    //                  );
174    //              }
175    //              ProcessEvent::IOData => {
176    //                  println!(
177    //                      "Event {:?} | req-id {} | # {} : {}",
178    //                      status,
179    //                      data.request.as_ref().unwrap().request_id,
180    //                      data.line_number,
181    //                      data.line
182    //                  );
183    //                  //now assume we want to exit the process with some data
184    //                  let mut result = ProcessResult::new();
185    //                  result.set_exit_flag_and_success(true, Ok(true));
186    //                  result.data_num = Some(8111981);
187    //                  result.data_vec_str = Some(vec![String::from("I found my hidden data!")]);
188    //                  return result;
189    //
190    //                  //demo how to kill/stop
191    //                  //_ = data.kill();
192    //              }
193    //              other => {
194    //                  if !data.line.is_empty() {
195    //                      println!(
196    //                          "Event {:?} | req-id {} | additional detail(s): {}",
197    //                          other,
198    //                          data.request.as_ref().unwrap().request_id,
199    //                          data.line
200    //                      );
201    //                  } else {
202    //                      println!(
203    //                          "Event {:?} | req-id {}",
204    //                          other,
205    //                          data.request.as_ref().unwrap().request_id
206    //                      );
207    //                  }
208    //              }
209    //          }
210    //          ProcessResult::new()
211    //      };
212    //
213    //
214    //
215    //    let request2 = ProcessRequest {
216    //        request_id: 151,
217    //        callback: Some(Arc::new(callback)),
218    //        use_shell: true,
219    //        cmd_line: vec![vec![
220    //            String::from("echo"),
221    //            String::from("stdout"),
222    //            String::from("&"),
223    //            String::from("echo"),
224    //            String::from("stderr"),
225    //            String::from(">&2"),
226    //        ]],
227    //        non_blocking_mode: true,
228    //    };
229    //
230    //    // non Blocking mode
231    //    let process_result = ProcessRequest::start(request2);
232    //    println!("Returned from Start! of non blocking");
233    //
234    //    let mut internal_data = ProcessResult::new();
235    //    //check & wait for the non blocking mode
236    //    if process_result.join_handle.is_some() {
237    //        if process_result.join_handle.as_ref().unwrap().is_ok() {
238    //            internal_data = process_result.join_handle.unwrap().unwrap().join().unwrap();
239    //            println!(
240    //                "Start - join waiting over in non blocking mode {:?}",
241    //                internal_data
242    //            );
243    //        } else {
244    //            internal_data.success = Err(process_result.join_handle.unwrap().err().unwrap());
245    //            println!(
246    //                "Start - Error in non blocking mode {:?}",
247    //                internal_data.success
248    //            );
249    //        }
250    //    } else {
251    //        internal_data = process_result;
252    //    }
253    //    println!("result dump : {:?}", internal_data);
254    //
255    //    println!(
256    //     "test_using_sh_output_streaming, start calc in windows {:?}",
257    //     ProcessRequest::start(ProcessRequest {
258    //         request_id: 191,
259    //         callback: Some(Arc::new(callback)),
260    //         use_shell: true,
261    //         cmd_line: vec![vec![String::from("calc")]],
262    //         non_blocking_mode: false,
263    //     }));
264     ```
265    */
266    pub fn start(process_request: ProcessRequest) -> ProcessResult {
267        let request = Arc::new(process_request);
268        if request.non_blocking_mode {
269            let join_handle = thread::Builder::new()
270                .name(format!("pes_th_rq_{}", request.request_id).into())
271                .spawn(move || {
272                    let response = start_process(request);
273                    return response;
274                });
275            let mut result = ProcessResult::new();
276            result.set_join_handle(Some(join_handle));
277            return result;
278        } else {
279            return start_process(request);
280        }
281    }
282}
283
284fn start_process(request: Arc<ProcessRequest>) -> ProcessResult {
285    let mut process_result = ProcessResult::new();
286    let mut process_data = ProcessData::new();
287    process_data.line.clear();
288    process_data.request = Some(Arc::clone(&request));
289    if request.as_ref().cmd_line.len() == 0 || request.as_ref().cmd_line[0].len() == 0 {
290        process_data
291            .line
292            .push_str(format!("{:?}", "Command line - arguments are unavailable!").as_str());
293        return check_and_trigger_callback(&request, &ProcessEvent::StartError, &process_data);
294    }
295    process_data.line.push_str(
296        format!(
297            "Executing in thread-context -> id: {:?}, name: {:?}",
298            thread::current().id(),
299            thread::current().name()
300        )
301        .as_str(),
302    );
303    process_result = check_and_trigger_callback(&request, &ProcessEvent::Starting, &process_data);
304
305    let process_req = &request;
306    let stdout_reader = handle_pipeline(&request).stderr_to_stdout().reader();
307    if stdout_reader.as_ref().is_ok() {
308        process_data.reader = Some(stdout_reader.as_ref().unwrap());
309    }
310    match stdout_reader.as_ref() {
311        Ok(stdout_reader) => {
312            process_result =
313                check_and_trigger_callback(process_req, &ProcessEvent::Started, &process_data);
314            let mut buffer_reader = BufReader::new(stdout_reader);
315            loop {
316                process_data.line.clear();
317                let result = buffer_reader.read_line(&mut process_data.line);
318                match result {
319                    Ok(result) if result == 0 => {
320                        check_and_trigger_callback(
321                            process_req,
322                            &ProcessEvent::IOEof,
323                            &process_data,
324                        );
325                        break;
326                    }
327                    Ok(_result) => {
328                        process_data.line_number += 1;
329                        process_result = check_and_trigger_callback(
330                            process_req,
331                            &ProcessEvent::IOData,
332                            &process_data,
333                        );
334                        match &process_result {
335                            process_result
336                                if process_result.should_exit.is_some()
337                                    && process_result.should_exit.unwrap() == true =>
338                            {
339                                check_and_trigger_callback(
340                                    process_req,
341                                    &ProcessEvent::ExitRequested,
342                                    &process_data,
343                                );
344                                break;
345                            }
346
347                            _other => {}
348                        }
349                    }
350                    Err(error) => {
351                        process_data.line.push_str(format!("{:?}", error).as_str());
352                        check_and_trigger_callback(
353                            process_req,
354                            &ProcessEvent::IOError,
355                            &process_data,
356                        );
357                        break;
358                    }
359                }
360            }
361            process_data.line.clear();
362            let exit_result = stdout_reader.kill();
363
364            match exit_result {
365                Ok(_) => {
366                    check_and_trigger_callback(process_req, &ProcessEvent::Exited, &process_data);
367                }
368                Err(_) => {
369                    check_and_trigger_callback(
370                        process_req,
371                        &ProcessEvent::KillError,
372                        &process_data,
373                    );
374                }
375            }
376        }
377        _error => {
378            let reader = stdout_reader.as_ref();
379            if reader.err().is_some() {
380                process_data
381                    .line
382                    .push_str(format!("{:?}", reader.err().unwrap()).as_str());
383            }
384            check_and_trigger_callback(process_req, &ProcessEvent::StartError, &process_data);
385        }
386    }
387    process_data.request = None;
388    process_data.reader = None;
389    process_result
390}
391
392/// handle pipeline based multiple command lines
393fn handle_pipeline(request: &Arc<ProcessRequest>) -> Expression {
394    let cmd_line = &request.cmd_line;
395    let use_shell = request.use_shell;
396    let mut cmd_pipeline;
397    if use_shell {
398        cmd_pipeline = sh_vector(&cmd_line[0]);
399    } else {
400        let cli = vec_string_to_osstring(&cmd_line[0]);
401        cmd_pipeline = cmd(&cli[0], &cli[1..]);
402    }
403    if cmd_line.len() > 1 {
404        let mut cmd_itr = cmd_line.iter();
405        cmd_itr.next();
406        for command in cmd_itr {
407            if use_shell {
408                cmd_pipeline = cmd_pipeline.pipe(sh_vector(&command));
409            } else {
410                let cli = vec_string_to_osstring(&command);
411                cmd_pipeline = cmd_pipeline.pipe(cmd(&cli[0], &cli[1..]));
412            }
413        }
414    }
415    cmd_pipeline
416}
417
418/// check if the callback is registered and if yes then trigger it wi the supplied data
419fn check_and_trigger_callback(
420    request: &Arc<ProcessRequest>,
421    event: &ProcessEvent,
422    data: &ProcessData,
423) -> ProcessResult {
424    if request.callback.as_ref().is_some() {
425        return request.callback.as_ref().unwrap()(event, data);
426    };
427    ProcessResult::new()
428}
429
430/// create and run a shell based command, using vector of cmd and arguments
431fn sh_vector(command: &Vec<String>) -> Expression {
432    let argv = shell_command_argv_vector(command.into());
433    cmd(&argv[0], &argv[1..])
434}
435
436/// create a shell based command
437#[cfg(unix)]
438fn shell_command_argv_vector(command: &Vec<String>) -> Vec<OsString> {
439    let mut cli: Vec<OsString> = vec_string_to_osstring(command);
440    let mut full_args = vec!["/bin/sh".into(), "-c".into()];
441    full_args.append(&mut cli);
442    full_args
443}
444
445/// Prepare shell based command
446#[cfg(windows)]
447fn shell_command_argv_vector(command: &Vec<String>) -> Vec<OsString> {
448    let comspec = std::env::var_os("COMSPEC").unwrap_or_else(|| "cmd.exe".into());
449    let mut cli: Vec<OsString> = vec_string_to_osstring(command);
450    let mut full_args = vec![comspec, "/C".into()];
451    full_args.append(&mut cli);
452    full_args
453}
454
455/// convert vector of [`String`] to vector of [`OsString`]
456fn vec_string_to_osstring(input: &Vec<String>) -> Vec<OsString> {
457    input.iter().map(|x| x.as_str().into()).collect()
458}
459
460#[cfg(test)]
461mod tests {
462    use crate::{ProcessData, ProcessEvent, ProcessRequest, ProcessResult};
463    use std::{any::Any, fmt::Error, sync::Arc};
464
465    #[test]
466    pub fn test_using_sh_output_streaming_new_version() {
467        let callback = |status: &ProcessEvent, data: &ProcessData| -> ProcessResult {
468            match status {
469                ProcessEvent::Started => {
470                    println!(
471                        "Event {:?} | req-id {}  | Pids: {:?}",
472                        status,
473                        data.request.as_ref().unwrap().request_id,
474                        data.child_pids()
475                    );
476                }
477                ProcessEvent::IOData => {
478                    println!(
479                        "Event {:?} | req-id {} | # {} : {}",
480                        status,
481                        data.request.as_ref().unwrap().request_id,
482                        data.line_number,
483                        data.line
484                    );
485
486                    let mut result = ProcessResult::new();
487                    result.set_exit_flag_and_success(true, Ok(true));
488                    result.data_num = Some(8111981);
489                    result.data_vec_str = Some(vec![String::from("I found my hidden data!")]);
490                    return result;
491                    //demo how to kill/stop
492                    //_ = data.kill();
493                }
494                other => {
495                    if !data.line.is_empty() {
496                        println!(
497                            "Event {:?} | req-id {} | additional detail(s): {}",
498                            other,
499                            data.request.as_ref().unwrap().request_id,
500                            data.line
501                        );
502                    } else {
503                        println!(
504                            "Event {:?} | req-id {}",
505                            other,
506                            data.request.as_ref().unwrap().request_id
507                        );
508                    }
509                }
510            }
511            ProcessResult::new()
512        };
513
514        let request1 = ProcessRequest {
515            request_id: 121,
516            callback: Some(Arc::new(callback)),
517            use_shell: true,
518            cmd_line: vec![vec![
519                String::from("echo"),
520                String::from("stdout"),
521                String::from("&"),
522                String::from("timeout"),
523                String::from("/t"),
524                String::from("3"),
525                String::from("&"),
526                String::from("echo"),
527                String::from("stderr"),
528                String::from(">&2"),
529            ]],
530            non_blocking_mode: false,
531        };
532
533        let request2 = ProcessRequest {
534            request_id: 151,
535            callback: Some(Arc::new(callback)),
536            use_shell: true,
537            cmd_line: vec![vec![
538                String::from("echo"),
539                String::from("stdout"),
540                String::from("&"),
541                String::from("timeout"),
542                String::from("/t"),
543                String::from("2"),
544                String::from("&"),
545                String::from("echo"),
546                String::from("stderr"),
547                String::from(">&2"),
548            ]],
549            non_blocking_mode: true,
550        };
551
552        // non Blocking mode
553        let process_result = ProcessRequest::start(request2);
554        println!("Returned from Start! of non blocking");
555
556        let mut internal_data = ProcessResult::new();
557        //check & wait for the non blocking mode
558        if process_result.join_handle.is_some() {
559            if process_result.join_handle.as_ref().unwrap().is_ok() {
560                internal_data = process_result.join_handle.unwrap().unwrap().join().unwrap();
561                println!("Start - join waiting over in non blocking mode");
562            } else {
563                internal_data.success = Err(process_result.join_handle.unwrap().err().unwrap());
564                println!("Start - Error in non blocking mode");
565            }
566        } else {
567            internal_data = process_result;
568        }
569        println!("result dump : {:?}", internal_data);
570
571        //blocking mode
572        let result1 = ProcessRequest::start(request1);
573        println!("Returned from Start! of blocking");
574
575        println!(
576            "test_using_sh_output_streaming, demo pipe-line {:?}",
577            //current dir listing and sort the piped output
578            ProcessRequest::start(ProcessRequest {
579                request_id: 161,
580                callback: Some(Arc::new(callback)),
581                use_shell: true,
582                cmd_line: vec![vec![String::from("dir")], vec![String::from("sort")]],
583                non_blocking_mode: true,
584            })
585        );
586
587        let request4 = ProcessRequest {
588            request_id: 171,
589            callback: Some(Arc::new(callback)),
590            use_shell: true,
591            cmd_line: vec![vec![String::from(r#"echo "Sandy" "#)]],
592            non_blocking_mode: true,
593        };
594        println!(
595            "test_using_sh_output_streaming , demo double quotes {:?}",
596            //current dir listing and sort the piped output
597            ProcessRequest::start(request4)
598        );
599
600        let request5 = ProcessRequest {
601            request_id: 181,
602            callback: Some(Arc::new(callback)),
603            use_shell: true,
604            cmd_line: vec![vec![]],
605            non_blocking_mode: true,
606        };
607
608        println!(
609            "test_using_sh_output_streaming, no arguments means start error {:?}",
610            ProcessRequest::start(request5)
611        );
612
613        println!(
614            "test_using_sh_output_streaming, start calc in windows {:?}",
615            ProcessRequest::start(ProcessRequest {
616                request_id: 191,
617                callback: Some(Arc::new(callback)),
618                use_shell: true,
619                cmd_line: vec![vec![String::from("calc")]],
620                non_blocking_mode: false,
621            })
622        );
623
624        //
625    }
626}