Skip to main content

prosa_hyper/client/
proc.rs

1use std::sync::Arc;
2
3use opentelemetry::KeyValue;
4use prosa::{
5    core::{
6        adaptor::Adaptor,
7        error::ProcError,
8        msg::InternalMsg,
9        proc::{Proc, ProcBusParam as _, ProcConfig as _, proc, proc_settings},
10    },
11    io::stream::TargetSetting,
12};
13use serde::{Deserialize, Serialize};
14use tokio::task::JoinSet;
15use tracing::{info, warn};
16
17use crate::{
18    HyperProcError,
19    client::{adaptor::HyperClientAdaptor, socket::HyperClientSocket},
20};
21
22/// Hyper client processor settings
23#[proc_settings]
24#[derive(Debug, Deserialize, Serialize, Clone)]
25pub struct HyperClientSettings {
26    /// Service name
27    pub service_name: String,
28    /// List of backend services
29    pub backends: Vec<TargetSetting>,
30    /// Minimum number of socket connections per target
31    #[serde(default = "HyperClientSettings::default_min_socket")]
32    min_socket: u32,
33    /// Maximum number of socket connections per target
34    #[serde(default = "HyperClientSettings::default_max_socket")]
35    max_socket: u32,
36    /// Timeout for HTTP messages in milliseconds
37    #[serde(default = "HyperClientSettings::default_http_timeout")]
38    http_timeout: u64,
39}
40
41impl HyperClientSettings {
42    fn default_min_socket() -> u32 {
43        1
44    }
45
46    fn default_max_socket() -> u32 {
47        20
48    }
49
50    fn default_http_timeout() -> u64 {
51        5000
52    }
53
54    /// Create a new Hyper client settings listenning to a service
55    pub fn new(service_name: String) -> Self {
56        HyperClientSettings {
57            service_name,
58            ..Default::default()
59        }
60    }
61
62    /// Add a new Hyper client backend
63    pub fn add_backend(&mut self, target: TargetSetting) {
64        self.backends.push(target);
65    }
66}
67
68#[proc_settings]
69impl Default for HyperClientSettings {
70    fn default() -> HyperClientSettings {
71        HyperClientSettings {
72            service_name: "hyper".to_string(),
73            backends: Vec::new(),
74            min_socket: Self::default_min_socket(),
75            max_socket: Self::default_max_socket(),
76            http_timeout: Self::default_http_timeout(),
77        }
78    }
79}
80
81/// Hyper server processor
82#[proc(settings = HyperClientSettings)]
83pub struct HyperClientProc {}
84
85#[proc]
86impl<M, A> Proc<A> for HyperClientProc
87where
88    M: 'static
89        + std::marker::Send
90        + std::marker::Sync
91        + std::marker::Sized
92        + std::clone::Clone
93        + std::fmt::Debug
94        + prosa::core::msg::Tvf
95        + std::default::Default,
96    A: 'static + Adaptor + HyperClientAdaptor<M> + std::marker::Send + std::marker::Sync,
97{
98    /// Main loop of the processor
99    async fn internal_run(&mut self) -> Result<(), Box<dyn ProcError + Send + Sync>> {
100        // Initiate an adaptor for the Hyper client processor
101        let adaptor = Arc::new(A::new(self)?);
102
103        // Add proc main queue (id: 0)
104        self.proc.add_proc().await?;
105
106        // List of client sockets tasks
107        let mut client_sockets = JoinSet::new();
108
109        // Meter to log HTTP requests
110        let meter = self.get_proc_param().meter("hyper_client");
111        let observable_http_histogram = meter
112            .u64_histogram("prosa_hyper_cli_duration")
113            .with_description("Hyper HTTP client request duration histogram")
114            .build();
115        let observable_http_socket = meter
116            .u64_gauge("prosa_hyper_cli_socket")
117            .with_description("Hyper HTTP client socket counter")
118            .build();
119
120        // Create client sockets
121        if !self.settings.backends.is_empty() {
122            for backend in &self.settings.backends {
123                for _ in 0..self.settings.min_socket {
124                    let client_socket =
125                        HyperClientSocket::new(backend.clone(), self.settings.http_timeout);
126                    client_socket.spawn(
127                        &mut client_sockets,
128                        self.proc.clone(),
129                        adaptor.clone(),
130                        self.settings.service_name.clone(),
131                        observable_http_histogram.clone(),
132                    );
133                }
134            }
135        } else {
136            return Err(Box::new(HyperProcError::Other(
137                "No backend configured for the Hyper client processor".to_string(),
138            )));
139        }
140
141        // Update socket number after all creations
142        observable_http_socket.record(
143            client_sockets.len() as u64,
144            &[
145                KeyValue::new("proc", self.name().to_string()),
146                KeyValue::new("service", self.settings.service_name.clone()),
147            ],
148        );
149
150        loop {
151            tokio::select! {
152                Some(msg) = self.internal_rx_queue.recv() => {
153                    match msg {
154                        InternalMsg::Request(msg) => panic!(
155                            "The hyper client processor[0] {} receive a request {:?}",
156                            self.get_proc_id(),
157                            msg
158                        ),
159                        InternalMsg::Response(msg) => panic!(
160                            "The hyper client processor[0] {} receive a response {:?}",
161                            self.get_proc_id(),
162                            msg
163                        ),
164                        InternalMsg::Error(err_msg) => panic!(
165                            "The hyper client processor[0] {} receive an error {:?}",
166                            self.get_proc_id(),
167                            err_msg
168                        ),
169                        InternalMsg::Command(_) => todo!(),
170                        InternalMsg::Config => todo!(),
171                        InternalMsg::Service(table) => self.service = table,
172                        InternalMsg::Shutdown => {
173                            adaptor.terminate();
174                            self.proc.remove_proc(None).await?;
175                            warn!("The Hyper client processor will shut down");
176                            return Ok(());
177                        }
178                    }
179                },
180                Some(socket) = client_sockets.join_next(), if !client_sockets.is_empty() => {
181                    match socket.map_err(|e| HyperProcError::Other(format!("Hyper client socket task join error: {}", e)))? {
182                        Ok(s) => {
183                            info!("A Hyper client socket has ended, restarting a new one: {s:?}");
184                            s.spawn(
185                                &mut client_sockets,
186                                self.proc.clone(),
187                                adaptor.clone(),
188                                self.settings.service_name.clone(),
189                                observable_http_histogram.clone()
190                            );
191                        },
192                        Err(e) =>  {
193                            warn!("A Hyper client socket task ended with error: {}", e);
194                        }
195                    }
196
197                    observable_http_socket.record(
198                        client_sockets.len() as u64,
199                        &[
200                            KeyValue::new("proc", self.name().to_string()),
201                            KeyValue::new("service", self.settings.service_name.clone()),
202                        ],
203                    );
204                },
205            }
206        }
207    }
208}