process_events_streaming/lib.rs
1use duct::{cmd, Expression, ReaderHandle};
2use std::ffi::OsString;
3use std::fmt::Error;
4use std::io::{BufRead, BufReader};
5
6use std::sync::Arc;
7use std::thread::JoinHandle;
8use std::{io, thread};
9
10/// Various events associated with process's life-cycle
11///
12#[derive(Debug)]
13pub enum ProcessEvent {
14 /// Default value placeholder
15 _Unknown,
16 /// Process is starting but not yet started!
17 Starting,
18 /// Process is started
19 Started,
20 /// Error occurred while starting the process itself
21 StartError,
22 /// Process started but error occurred during reading the output data
23 IOError,
24 /// Process started and output data reader reached to the EOF, means process's output data is unavailable
25 IOEof,
26 /// Process started and a line from the output data is available now
27 IOData,
28 /// Process started and during IOData reading based on the API consumer's decision the callback returned [`Some(false)`] ,
29 /// which means process's exit request is submitted
30 ExitRequested,
31 /// Kill API was used to kill the process
32 KillRequested,
33 /// Process which was started earlier now exited
34 Exited,
35 /// A error occurred while killing/stopping the process
36 KillError,
37}
38
39/// Various fields related to the process
40///
41pub struct ProcessData<'a> {
42 /// Request data
43 pub request: Option<Arc<ProcessRequest>>,
44 /// Line number from output of the Process's STDOUT & STDERR
45 pub line_number: i64,
46 /// A single line data from output of the Process's STDOUT & STDERR
47 pub line: String,
48 /// Internal reader handle for managing the process
49 reader: Option<&'a ReaderHandle>,
50}
51
52impl ProcessData<'_> {
53 /// Create a new instance of the [`ProcessData`]
54 ///
55 pub fn new() -> Self {
56 Self {
57 request: None,
58 line_number: 0,
59 line: String::new(),
60 reader: None,
61 }
62 }
63 /// Kill the running process
64 pub fn kill(&self) -> io::Result<()> {
65 Ok(if self.reader.is_some() {
66 check_and_trigger_callback(
67 &self.request.as_ref().unwrap(),
68 &ProcessEvent::KillRequested,
69 &self,
70 );
71 return self.reader.as_ref().unwrap().kill();
72 })
73 }
74
75 /// Get the list of child pids
76 pub fn child_pids(&self) -> Vec<u32> {
77 if self.reader.is_some() {
78 return self.reader.as_ref().unwrap().pids();
79 }
80 return vec![];
81 }
82}
83
84/// Resulted data received from the process execution
85#[derive(Debug)]
86pub struct ProcessResult {
87 /// In case of non-blocking mode use this to join and wait for the process to complete
88 pub join_handle: Option<io::Result<JoinHandle<ProcessResult>>>,
89 /// Should exit or not the process based on the custom conditions
90 pub should_exit: Option<bool>,
91 /// Process execution was successful or not for the desired outcome
92 pub success: Result<bool, std::io::Error>,
93 /// Date as String vector
94 pub data_vec_str: Option<Vec<String>>,
95 /// Date as true/false
96 pub data_bool: Option<bool>,
97 /// Date as numeric value i128
98 pub data_num: Option<i128>,
99 /// Date as f64 value
100 pub data_decimal: Option<f64>,
101}
102
103impl ProcessResult {
104 pub fn new() -> Self {
105 Self {
106 join_handle: None,
107 should_exit: None,
108 success: Ok(false),
109 data_vec_str: None,
110 data_bool: None,
111 data_num: None,
112 data_decimal: None,
113 }
114 }
115
116 /// set join handle
117 fn set_join_handle(&mut self, join_handle: Option<io::Result<JoinHandle<ProcessResult>>>) {
118 self.join_handle = join_handle;
119 }
120
121 ///set exit and success data
122 fn set_exit_flag_and_success(
123 &mut self,
124 should_exit: bool,
125 success: Result<bool, std::io::Error>,
126 ) {
127 self.should_exit = Some(should_exit);
128 self.success = success;
129 }
130}
131
132unsafe impl Sync for ProcessRequest {}
133unsafe impl Send for ProcessRequest {}
134
135/// A request structure to start a process
136pub struct ProcessRequest {
137 /// Custom unique numeric id to relate the various callbacks for a particular process execution session
138 pub request_id: u32,
139 /// Use shell mode or direct executable path based execution
140 pub use_shell: bool,
141 /// Use blocking or non blocking mode using internal threads
142 pub non_blocking_mode: bool,
143 /// (2D Array) Vector of command line along with arguments. For a single command line one vector element is enough. For the pipe line use case where output of one command to provide to the next command, use Vector of command lines.
144 pub cmd_line: Vec<Vec<String>>,
145 /// Register callback to get various events and process output, for no callbacks use None
146 pub callback: Option<Arc<dyn Fn(&ProcessEvent, &ProcessData) -> ProcessResult + 'static>>,
147}
148
149impl ProcessRequest {
150 /**
151 Run a process based on the provided process request which is events based multi process execution(blocking & non-blocking modes) in parallel and with data streaming
152 Generates various events [`ProcessEvent`] according to the process's life-cycle, process's information and data [`ProcessData`] associated with that event
153 # Arguments
154 ProcessRequest : [`ProcessRequest`] // A request structure to start a process
155 # Return
156 ProcessResult : [`ProcessResult`] // Contains join handle for non-blocking and data variables
157 For Blocking mode use join_handle [`Option<io::Result<JoinHandle<ProcessResult>>>`], so the caller can join & wait for the process completion if needed!
158 In the callback set the custom data to be retrieved once process execution is over, which will be returned in response of the join call.
159 # Examples
160 ```
161 // Setup callback for the process events and data streaming
162 //
163 // use process_events_streaming::{ProcessRequest, ProcessResult, ProcessData, ProcessEvent};
164 // use std::{thread, time::Duration};
165 // let callback = |status: &ProcessEvent, data: &ProcessData| -> ProcessResult {
166 // match status {
167 // ProcessEvent::Started => {
168 // println!(
169 // "Event {:?} | req-id {} | Pids: {:?}",
170 // status,
171 // data.request.as_ref().unwrap().request_id,
172 // data.child_pids()
173 // );
174 // }
175 // ProcessEvent::IOData => {
176 // println!(
177 // "Event {:?} | req-id {} | # {} : {}",
178 // status,
179 // data.request.as_ref().unwrap().request_id,
180 // data.line_number,
181 // data.line
182 // );
183 // //now assume we want to exit the process with some data
184 // let mut result = ProcessResult::new();
185 // result.set_exit_flag_and_success(true, Ok(true));
186 // result.data_num = Some(8111981);
187 // result.data_vec_str = Some(vec![String::from("I found my hidden data!")]);
188 // return result;
189 //
190 // //demo how to kill/stop
191 // //_ = data.kill();
192 // }
193 // other => {
194 // if !data.line.is_empty() {
195 // println!(
196 // "Event {:?} | req-id {} | additional detail(s): {}",
197 // other,
198 // data.request.as_ref().unwrap().request_id,
199 // data.line
200 // );
201 // } else {
202 // println!(
203 // "Event {:?} | req-id {}",
204 // other,
205 // data.request.as_ref().unwrap().request_id
206 // );
207 // }
208 // }
209 // }
210 // ProcessResult::new()
211 // };
212 //
213 //
214 //
215 // let request2 = ProcessRequest {
216 // request_id: 151,
217 // callback: Some(Arc::new(callback)),
218 // use_shell: true,
219 // cmd_line: vec![vec![
220 // String::from("echo"),
221 // String::from("stdout"),
222 // String::from("&"),
223 // String::from("echo"),
224 // String::from("stderr"),
225 // String::from(">&2"),
226 // ]],
227 // non_blocking_mode: true,
228 // };
229 //
230 // // non Blocking mode
231 // let process_result = ProcessRequest::start(request2);
232 // println!("Returned from Start! of non blocking");
233 //
234 // let mut internal_data = ProcessResult::new();
235 // //check & wait for the non blocking mode
236 // if process_result.join_handle.is_some() {
237 // if process_result.join_handle.as_ref().unwrap().is_ok() {
238 // internal_data = process_result.join_handle.unwrap().unwrap().join().unwrap();
239 // println!(
240 // "Start - join waiting over in non blocking mode {:?}",
241 // internal_data
242 // );
243 // } else {
244 // internal_data.success = Err(process_result.join_handle.unwrap().err().unwrap());
245 // println!(
246 // "Start - Error in non blocking mode {:?}",
247 // internal_data.success
248 // );
249 // }
250 // } else {
251 // internal_data = process_result;
252 // }
253 // println!("result dump : {:?}", internal_data);
254 //
255 // println!(
256 // "test_using_sh_output_streaming, start calc in windows {:?}",
257 // ProcessRequest::start(ProcessRequest {
258 // request_id: 191,
259 // callback: Some(Arc::new(callback)),
260 // use_shell: true,
261 // cmd_line: vec![vec![String::from("calc")]],
262 // non_blocking_mode: false,
263 // }));
264 ```
265 */
266 pub fn start(process_request: ProcessRequest) -> ProcessResult {
267 let request = Arc::new(process_request);
268 if request.non_blocking_mode {
269 let join_handle = thread::Builder::new()
270 .name(format!("pes_th_rq_{}", request.request_id).into())
271 .spawn(move || {
272 let response = start_process(request);
273 return response;
274 });
275 let mut result = ProcessResult::new();
276 result.set_join_handle(Some(join_handle));
277 return result;
278 } else {
279 return start_process(request);
280 }
281 }
282}
283
284fn start_process(request: Arc<ProcessRequest>) -> ProcessResult {
285 let mut process_result = ProcessResult::new();
286 let mut process_data = ProcessData::new();
287 process_data.line.clear();
288 process_data.request = Some(Arc::clone(&request));
289 if request.as_ref().cmd_line.len() == 0 || request.as_ref().cmd_line[0].len() == 0 {
290 process_data
291 .line
292 .push_str(format!("{:?}", "Command line - arguments are unavailable!").as_str());
293 return check_and_trigger_callback(&request, &ProcessEvent::StartError, &process_data);
294 }
295 process_data.line.push_str(
296 format!(
297 "Executing in thread-context -> id: {:?}, name: {:?}",
298 thread::current().id(),
299 thread::current().name()
300 )
301 .as_str(),
302 );
303 process_result = check_and_trigger_callback(&request, &ProcessEvent::Starting, &process_data);
304
305 let process_req = &request;
306 let stdout_reader = handle_pipeline(&request).stderr_to_stdout().reader();
307 if stdout_reader.as_ref().is_ok() {
308 process_data.reader = Some(stdout_reader.as_ref().unwrap());
309 }
310 match stdout_reader.as_ref() {
311 Ok(stdout_reader) => {
312 process_result =
313 check_and_trigger_callback(process_req, &ProcessEvent::Started, &process_data);
314 let mut buffer_reader = BufReader::new(stdout_reader);
315 loop {
316 process_data.line.clear();
317 let result = buffer_reader.read_line(&mut process_data.line);
318 match result {
319 Ok(result) if result == 0 => {
320 check_and_trigger_callback(
321 process_req,
322 &ProcessEvent::IOEof,
323 &process_data,
324 );
325 break;
326 }
327 Ok(_result) => {
328 process_data.line_number += 1;
329 process_result = check_and_trigger_callback(
330 process_req,
331 &ProcessEvent::IOData,
332 &process_data,
333 );
334 match &process_result {
335 process_result
336 if process_result.should_exit.is_some()
337 && process_result.should_exit.unwrap() == true =>
338 {
339 check_and_trigger_callback(
340 process_req,
341 &ProcessEvent::ExitRequested,
342 &process_data,
343 );
344 break;
345 }
346
347 _other => {}
348 }
349 }
350 Err(error) => {
351 process_data.line.push_str(format!("{:?}", error).as_str());
352 check_and_trigger_callback(
353 process_req,
354 &ProcessEvent::IOError,
355 &process_data,
356 );
357 break;
358 }
359 }
360 }
361 process_data.line.clear();
362 let exit_result = stdout_reader.kill();
363
364 match exit_result {
365 Ok(_) => {
366 check_and_trigger_callback(process_req, &ProcessEvent::Exited, &process_data);
367 }
368 Err(_) => {
369 check_and_trigger_callback(
370 process_req,
371 &ProcessEvent::KillError,
372 &process_data,
373 );
374 }
375 }
376 }
377 _error => {
378 let reader = stdout_reader.as_ref();
379 if reader.err().is_some() {
380 process_data
381 .line
382 .push_str(format!("{:?}", reader.err().unwrap()).as_str());
383 }
384 check_and_trigger_callback(process_req, &ProcessEvent::StartError, &process_data);
385 }
386 }
387 process_data.request = None;
388 process_data.reader = None;
389 process_result
390}
391
392/// handle pipeline based multiple command lines
393fn handle_pipeline(request: &Arc<ProcessRequest>) -> Expression {
394 let cmd_line = &request.cmd_line;
395 let use_shell = request.use_shell;
396 let mut cmd_pipeline;
397 if use_shell {
398 cmd_pipeline = sh_vector(&cmd_line[0]);
399 } else {
400 let cli = vec_string_to_osstring(&cmd_line[0]);
401 cmd_pipeline = cmd(&cli[0], &cli[1..]);
402 }
403 if cmd_line.len() > 1 {
404 let mut cmd_itr = cmd_line.iter();
405 cmd_itr.next();
406 for command in cmd_itr {
407 if use_shell {
408 cmd_pipeline = cmd_pipeline.pipe(sh_vector(&command));
409 } else {
410 let cli = vec_string_to_osstring(&command);
411 cmd_pipeline = cmd_pipeline.pipe(cmd(&cli[0], &cli[1..]));
412 }
413 }
414 }
415 cmd_pipeline
416}
417
418/// check if the callback is registered and if yes then trigger it wi the supplied data
419fn check_and_trigger_callback(
420 request: &Arc<ProcessRequest>,
421 event: &ProcessEvent,
422 data: &ProcessData,
423) -> ProcessResult {
424 if request.callback.as_ref().is_some() {
425 return request.callback.as_ref().unwrap()(event, data);
426 };
427 ProcessResult::new()
428}
429
430/// create and run a shell based command, using vector of cmd and arguments
431fn sh_vector(command: &Vec<String>) -> Expression {
432 let argv = shell_command_argv_vector(command.into());
433 cmd(&argv[0], &argv[1..])
434}
435
436/// create a shell based command
437#[cfg(unix)]
438fn shell_command_argv_vector(command: &Vec<String>) -> Vec<OsString> {
439 let mut cli: Vec<OsString> = vec_string_to_osstring(command);
440 let mut full_args = vec!["/bin/sh".into(), "-c".into()];
441 full_args.append(&mut cli);
442 full_args
443}
444
445/// Prepare shell based command
446#[cfg(windows)]
447fn shell_command_argv_vector(command: &Vec<String>) -> Vec<OsString> {
448 let comspec = std::env::var_os("COMSPEC").unwrap_or_else(|| "cmd.exe".into());
449 let mut cli: Vec<OsString> = vec_string_to_osstring(command);
450 let mut full_args = vec![comspec, "/C".into()];
451 full_args.append(&mut cli);
452 full_args
453}
454
455/// convert vector of [`String`] to vector of [`OsString`]
456fn vec_string_to_osstring(input: &Vec<String>) -> Vec<OsString> {
457 input.iter().map(|x| x.as_str().into()).collect()
458}
459
460#[cfg(test)]
461mod tests {
462 use crate::{ProcessData, ProcessEvent, ProcessRequest, ProcessResult};
463 use std::{any::Any, fmt::Error, sync::Arc};
464
465 #[test]
466 pub fn test_using_sh_output_streaming_new_version() {
467 let callback = |status: &ProcessEvent, data: &ProcessData| -> ProcessResult {
468 match status {
469 ProcessEvent::Started => {
470 println!(
471 "Event {:?} | req-id {} | Pids: {:?}",
472 status,
473 data.request.as_ref().unwrap().request_id,
474 data.child_pids()
475 );
476 }
477 ProcessEvent::IOData => {
478 println!(
479 "Event {:?} | req-id {} | # {} : {}",
480 status,
481 data.request.as_ref().unwrap().request_id,
482 data.line_number,
483 data.line
484 );
485
486 let mut result = ProcessResult::new();
487 result.set_exit_flag_and_success(true, Ok(true));
488 result.data_num = Some(8111981);
489 result.data_vec_str = Some(vec![String::from("I found my hidden data!")]);
490 return result;
491 //demo how to kill/stop
492 //_ = data.kill();
493 }
494 other => {
495 if !data.line.is_empty() {
496 println!(
497 "Event {:?} | req-id {} | additional detail(s): {}",
498 other,
499 data.request.as_ref().unwrap().request_id,
500 data.line
501 );
502 } else {
503 println!(
504 "Event {:?} | req-id {}",
505 other,
506 data.request.as_ref().unwrap().request_id
507 );
508 }
509 }
510 }
511 ProcessResult::new()
512 };
513
514 let request1 = ProcessRequest {
515 request_id: 121,
516 callback: Some(Arc::new(callback)),
517 use_shell: true,
518 cmd_line: vec![vec![
519 String::from("echo"),
520 String::from("stdout"),
521 String::from("&"),
522 String::from("timeout"),
523 String::from("/t"),
524 String::from("3"),
525 String::from("&"),
526 String::from("echo"),
527 String::from("stderr"),
528 String::from(">&2"),
529 ]],
530 non_blocking_mode: false,
531 };
532
533 let request2 = ProcessRequest {
534 request_id: 151,
535 callback: Some(Arc::new(callback)),
536 use_shell: true,
537 cmd_line: vec![vec![
538 String::from("echo"),
539 String::from("stdout"),
540 String::from("&"),
541 String::from("timeout"),
542 String::from("/t"),
543 String::from("2"),
544 String::from("&"),
545 String::from("echo"),
546 String::from("stderr"),
547 String::from(">&2"),
548 ]],
549 non_blocking_mode: true,
550 };
551
552 // non Blocking mode
553 let process_result = ProcessRequest::start(request2);
554 println!("Returned from Start! of non blocking");
555
556 let mut internal_data = ProcessResult::new();
557 //check & wait for the non blocking mode
558 if process_result.join_handle.is_some() {
559 if process_result.join_handle.as_ref().unwrap().is_ok() {
560 internal_data = process_result.join_handle.unwrap().unwrap().join().unwrap();
561 println!("Start - join waiting over in non blocking mode");
562 } else {
563 internal_data.success = Err(process_result.join_handle.unwrap().err().unwrap());
564 println!("Start - Error in non blocking mode");
565 }
566 } else {
567 internal_data = process_result;
568 }
569 println!("result dump : {:?}", internal_data);
570
571 //blocking mode
572 let result1 = ProcessRequest::start(request1);
573 println!("Returned from Start! of blocking");
574
575 println!(
576 "test_using_sh_output_streaming, demo pipe-line {:?}",
577 //current dir listing and sort the piped output
578 ProcessRequest::start(ProcessRequest {
579 request_id: 161,
580 callback: Some(Arc::new(callback)),
581 use_shell: true,
582 cmd_line: vec![vec![String::from("dir")], vec![String::from("sort")]],
583 non_blocking_mode: true,
584 })
585 );
586
587 let request4 = ProcessRequest {
588 request_id: 171,
589 callback: Some(Arc::new(callback)),
590 use_shell: true,
591 cmd_line: vec![vec![String::from(r#"echo "Sandy" "#)]],
592 non_blocking_mode: true,
593 };
594 println!(
595 "test_using_sh_output_streaming , demo double quotes {:?}",
596 //current dir listing and sort the piped output
597 ProcessRequest::start(request4)
598 );
599
600 let request5 = ProcessRequest {
601 request_id: 181,
602 callback: Some(Arc::new(callback)),
603 use_shell: true,
604 cmd_line: vec![vec![]],
605 non_blocking_mode: true,
606 };
607
608 println!(
609 "test_using_sh_output_streaming, no arguments means start error {:?}",
610 ProcessRequest::start(request5)
611 );
612
613 println!(
614 "test_using_sh_output_streaming, start calc in windows {:?}",
615 ProcessRequest::start(ProcessRequest {
616 request_id: 191,
617 callback: Some(Arc::new(callback)),
618 use_shell: true,
619 cmd_line: vec![vec![String::from("calc")]],
620 non_blocking_mode: false,
621 })
622 );
623
624 //
625 }
626}