1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::future::Future;
use std::sync::Arc;

use futures::FutureExt;

use super::{ProcessControlHandler, Runnable, RuntimeError};

pub struct ProcessManager {
    processes: Vec<Arc<Box<dyn Runnable + Send + Sync + 'static>>>,
}

impl ProcessManager {
    pub fn new() -> Self {
        Self { processes: vec![] }
    }

    pub fn insert(&mut self, process: impl Runnable + Send + Sync + 'static) {
        self.processes.push(Arc::new(Box::new(process)));
    }
}

#[async_trait::async_trait]
impl Runnable for ProcessManager {
    async fn process_start(&self) -> Result<(), RuntimeError> {
        async fn wrap_proc<F, Fut>(
            proc: Arc<Box<dyn Runnable + Send + Sync + 'static>>,
            init_shutdown: F,
        ) -> Result<(), RuntimeError>
        where
            Fut: Future<Output = ()>,
            F: FnOnce() -> Fut,
        {
            let proc_name = proc.process_name().to_string();
            //tracing::info!("Start process {proc_name}");

            let proc = proc.to_owned();
            tokio::spawn(async move { proc.process_start().await })
                .then(|prev| async {
                    let prev = prev.map_err(|err| RuntimeError::Internal {
                        message: format!("tokio spawn join error: {err:?}"),
                    })?;
                    if prev.is_err() {
                        /*          tracing::error!(
                            "Process {proc_name} stopped unexpectedly: {:?}",
                            prev.as_ref().unwrap_err()
                        );*/
                        init_shutdown().await;
                    } else {
                        //tracing::info!("Process {proc_name} stopped");
                    }
                    prev
                })
                .await
        }

        let handle = self.process_handle();
        let process_futures = self
            .processes
            .iter()
            .map(|proc| wrap_proc(proc.clone(), || async { handle.shutdown().await }))
            .collect::<Vec<_>>();
        let _bars: Vec<_> = futures::future::join_all(process_futures).await;
        Ok(())
    }

    fn process_handle(&self) -> Box<dyn ProcessControlHandler> {
        Box::new(ProcessHandle {
            runtime_handles: self
                .processes
                .iter()
                .map(|proc| (proc.process_name(), proc.process_handle()))
                .collect(),
        })
    }
}

impl Default for ProcessManager {
    fn default() -> Self {
        Self::new()
    }
}

struct ProcessHandle<'a> {
    runtime_handles: Vec<(&'a str, Box<dyn ProcessControlHandler>)>,
}

#[async_trait::async_trait]
impl ProcessControlHandler for ProcessHandle<'_> {
    async fn shutdown(&self) {
        // TODO make shutdowns in parallel
        for (name, runtime_handle) in self.runtime_handles.iter() {
            //tracing::info!("Initiate shutdown on process {name}");
            runtime_handle.shutdown().await;
        }
    }

    async fn reload(&self) {
        // TODO make reloads in parallel
        for (name, runtime_handle) in self.runtime_handles.iter() {
            //tracing::info!("Initiate reload on process {name}");
            runtime_handle.reload().await;
        }
    }
}

impl Clone for ProcessHandle<'_> {
    fn clone(&self) -> Self {
        todo!()
    }
}