prosa_hyper/client/
proc.rs1use std::sync::Arc;
2
3use opentelemetry::KeyValue;
4use prosa::{
5 core::{
6 adaptor::Adaptor,
7 error::ProcError,
8 msg::InternalMsg,
9 proc::{Proc, ProcBusParam as _, ProcConfig as _, proc, proc_settings},
10 },
11 io::stream::TargetSetting,
12};
13use serde::{Deserialize, Serialize};
14use tokio::task::JoinSet;
15use tracing::{info, warn};
16
17use crate::{
18 HyperProcError,
19 client::{adaptor::HyperClientAdaptor, socket::HyperClientSocket},
20};
21
22#[proc_settings]
24#[derive(Debug, Deserialize, Serialize, Clone)]
25pub struct HyperClientSettings {
26 pub service_name: String,
28 pub backends: Vec<TargetSetting>,
30 #[serde(default = "HyperClientSettings::default_min_socket")]
32 min_socket: u32,
33 #[serde(default = "HyperClientSettings::default_max_socket")]
35 max_socket: u32,
36 #[serde(default = "HyperClientSettings::default_http_timeout")]
38 http_timeout: u64,
39}
40
41impl HyperClientSettings {
42 fn default_min_socket() -> u32 {
43 1
44 }
45
46 fn default_max_socket() -> u32 {
47 20
48 }
49
50 fn default_http_timeout() -> u64 {
51 5000
52 }
53
54 pub fn new(service_name: String) -> Self {
56 HyperClientSettings {
57 service_name,
58 ..Default::default()
59 }
60 }
61
62 pub fn add_backend(&mut self, target: TargetSetting) {
64 self.backends.push(target);
65 }
66}
67
68#[proc_settings]
69impl Default for HyperClientSettings {
70 fn default() -> HyperClientSettings {
71 HyperClientSettings {
72 service_name: "hyper".to_string(),
73 backends: Vec::new(),
74 min_socket: Self::default_min_socket(),
75 max_socket: Self::default_max_socket(),
76 http_timeout: Self::default_http_timeout(),
77 }
78 }
79}
80
81#[proc(settings = HyperClientSettings)]
83pub struct HyperClientProc {}
84
85#[proc]
86impl<M, A> Proc<A> for HyperClientProc
87where
88 M: 'static
89 + std::marker::Send
90 + std::marker::Sync
91 + std::marker::Sized
92 + std::clone::Clone
93 + std::fmt::Debug
94 + prosa::core::msg::Tvf
95 + std::default::Default,
96 A: 'static + Adaptor + HyperClientAdaptor<M> + std::marker::Send + std::marker::Sync,
97{
98 async fn internal_run(&mut self) -> Result<(), Box<dyn ProcError + Send + Sync>> {
100 let adaptor = Arc::new(A::new(self)?);
102
103 self.proc.add_proc().await?;
105
106 let mut client_sockets = JoinSet::new();
108
109 let meter = self.get_proc_param().meter("hyper_client");
111 let observable_http_histogram = meter
112 .u64_histogram("prosa_hyper_cli_duration")
113 .with_description("Hyper HTTP client request duration histogram")
114 .build();
115 let observable_http_socket = meter
116 .u64_gauge("prosa_hyper_cli_socket")
117 .with_description("Hyper HTTP client socket counter")
118 .build();
119
120 if !self.settings.backends.is_empty() {
122 for backend in &self.settings.backends {
123 for _ in 0..self.settings.min_socket {
124 let client_socket =
125 HyperClientSocket::new(backend.clone(), self.settings.http_timeout);
126 client_socket.spawn(
127 &mut client_sockets,
128 self.proc.clone(),
129 adaptor.clone(),
130 self.settings.service_name.clone(),
131 observable_http_histogram.clone(),
132 );
133 }
134 }
135 } else {
136 return Err(Box::new(HyperProcError::Other(
137 "No backend configured for the Hyper client processor".to_string(),
138 )));
139 }
140
141 observable_http_socket.record(
143 client_sockets.len() as u64,
144 &[
145 KeyValue::new("proc", self.name().to_string()),
146 KeyValue::new("service", self.settings.service_name.clone()),
147 ],
148 );
149
150 loop {
151 tokio::select! {
152 Some(msg) = self.internal_rx_queue.recv() => {
153 match msg {
154 InternalMsg::Request(msg) => panic!(
155 "The hyper client processor[0] {} receive a request {:?}",
156 self.get_proc_id(),
157 msg
158 ),
159 InternalMsg::Response(msg) => panic!(
160 "The hyper client processor[0] {} receive a response {:?}",
161 self.get_proc_id(),
162 msg
163 ),
164 InternalMsg::Error(err_msg) => panic!(
165 "The hyper client processor[0] {} receive an error {:?}",
166 self.get_proc_id(),
167 err_msg
168 ),
169 InternalMsg::Command(_) => todo!(),
170 InternalMsg::Config => todo!(),
171 InternalMsg::Service(table) => self.service = table,
172 InternalMsg::Shutdown => {
173 adaptor.terminate();
174 self.proc.remove_proc(None).await?;
175 warn!("The Hyper client processor will shut down");
176 return Ok(());
177 }
178 }
179 },
180 Some(socket) = client_sockets.join_next(), if !client_sockets.is_empty() => {
181 match socket.map_err(|e| HyperProcError::Other(format!("Hyper client socket task join error: {}", e)))? {
182 Ok(s) => {
183 info!("A Hyper client socket has ended, restarting a new one: {s:?}");
184 s.spawn(
185 &mut client_sockets,
186 self.proc.clone(),
187 adaptor.clone(),
188 self.settings.service_name.clone(),
189 observable_http_histogram.clone()
190 );
191 },
192 Err(e) => {
193 warn!("A Hyper client socket task ended with error: {}", e);
194 }
195 }
196
197 observable_http_socket.record(
198 client_sockets.len() as u64,
199 &[
200 KeyValue::new("proc", self.name().to_string()),
201 KeyValue::new("service", self.settings.service_name.clone()),
202 ],
203 );
204 },
205 }
206 }
207 }
208}