1use crate::{
4 errors::{CatBridgeError, NetworkError, NetworkParseError},
5 mion::proto::{
6 parameter::{
7 well_known::{index_from_parameter_name, ParameterLocationSpecification},
8 DumpedMionParameters, MIONParameterAPIError, MionDumpParameters, SetMionParameters,
9 SetMionParametersResponse,
10 },
11 DEFAULT_MION_PARAMETER_PORT, MION_PARAMETER_TIMEOUT_SECONDS,
12 },
13};
14use bytes::{Bytes, BytesMut};
15use fnv::FnvHashMap;
16use std::net::Ipv4Addr;
17use tokio::{
18 io::{AsyncReadExt, AsyncWriteExt},
19 net::TcpStream,
20 time::{sleep, Duration},
21};
22
23pub async fn get_parameters(
40 mion_addr: Ipv4Addr,
41 parameter_port: Option<u16>,
42 timeout: Option<Duration>,
43) -> Result<DumpedMionParameters, CatBridgeError> {
44 get_parameters_with_logging_hooks(
45 mion_addr,
46 parameter_port,
47 timeout,
48 noop_tcp_session_made,
49 noop_connection_established,
50 noop_write_finished,
51 noop_read_finished,
52 )
53 .await
54}
55
56pub async fn get_parameters_with_logging_hooks<
76 TcpSessionHook,
77 ConnectionEstablishedHook,
78 WriteFinishedHook,
79 ReadFinishedHook,
80>(
81 mion_addr: Ipv4Addr,
82 parameter_port: Option<u16>,
83 timeout: Option<Duration>,
84 tcp_session_logging_hook: TcpSessionHook,
85 connection_established_logging_hook: ConnectionEstablishedHook,
86 write_finished_hook: WriteFinishedHook,
87 read_finished_hook: ReadFinishedHook,
88) -> Result<DumpedMionParameters, CatBridgeError>
89where
90 TcpSessionHook: Fn(u128) + Clone + Send + 'static,
91 ConnectionEstablishedHook: Fn(Ipv4Addr) + Clone + Send + 'static,
92 WriteFinishedHook: Fn(usize) + Clone + Send + 'static,
93 ReadFinishedHook: Fn(usize) + Clone + Send + 'static,
94{
95 let usable_timeout = timeout.unwrap_or(Duration::from_secs(MION_PARAMETER_TIMEOUT_SECONDS));
96 tcp_session_logging_hook(usable_timeout.as_millis());
98
99 tokio::select! {
100 res = get_parameters_without_timeout(
101 mion_addr,
102 parameter_port,
103 connection_established_logging_hook,
104 write_finished_hook,
105 read_finished_hook,
106 ) => { Ok(res.map(|(params, _stream)| params)?) }
107 () = sleep(usable_timeout) => {
108 Err(NetworkError::Timeout(usable_timeout).into())
109 }
110 }
111}
112
113pub async fn set_parameters<IterTy>(
131 parameters_to_set: IterTy,
132 mion_addr: Ipv4Addr,
133 parameter_port: Option<u16>,
134 timeout: Option<Duration>,
135) -> Result<SetMionParametersResponse, CatBridgeError>
136where
137 IterTy: Iterator<Item = (ParameterLocationSpecification, u8)>,
138{
139 set_parameters_with_logging_hooks(
140 parameters_to_set,
141 mion_addr,
142 parameter_port,
143 timeout,
144 noop_tcp_session_made,
145 noop_connection_established,
146 noop_write_finished,
147 noop_read_finished,
148 noop_set_value_hook,
149 noop_write_finished,
150 )
151 .await
152 .map(|(resp, _changed_values)| resp)
153}
154
155pub async fn set_parameters_and_get_changed_values<IterTy>(
173 parameters_to_set: IterTy,
174 mion_addr: Ipv4Addr,
175 parameter_port: Option<u16>,
176 timeout: Option<Duration>,
177) -> Result<(SetMionParametersResponse, FnvHashMap<usize, u8>), CatBridgeError>
178where
179 IterTy: Iterator<Item = (ParameterLocationSpecification, u8)>,
180{
181 set_parameters_with_logging_hooks(
182 parameters_to_set,
183 mion_addr,
184 parameter_port,
185 timeout,
186 noop_tcp_session_made,
187 noop_connection_established,
188 noop_write_finished,
189 noop_read_finished,
190 noop_set_value_hook,
191 noop_write_finished,
192 )
193 .await
194}
195
196#[allow(
216 clippy::too_many_arguments,
218)]
219pub async fn set_parameters_with_logging_hooks<
220 IterTy,
221 TcpSessionHook,
222 ConnectionEstablishedHook,
223 WriteFinishedHook,
224 ReadFinishedHook,
225 SetNewValueHook,
226 WriteSetFinishedHook,
227>(
228 parameters_to_set: IterTy,
229 mion_addr: Ipv4Addr,
230 parameter_port: Option<u16>,
231 timeout: Option<Duration>,
232 tcp_session_logging_hook: TcpSessionHook,
233 connection_established_logging_hook: ConnectionEstablishedHook,
234 write_finished_hook: WriteFinishedHook,
235 read_finished_hook: ReadFinishedHook,
236 set_new_value_hook: SetNewValueHook,
237 write_set_finished_hook: WriteSetFinishedHook,
238) -> Result<(SetMionParametersResponse, FnvHashMap<usize, u8>), CatBridgeError>
239where
240 IterTy: Iterator<Item = (ParameterLocationSpecification, u8)>,
241 TcpSessionHook: Fn(u128) + Clone + Send + 'static,
242 ConnectionEstablishedHook: Fn(Ipv4Addr) + Clone + Send + 'static,
243 WriteFinishedHook: Fn(usize) + Clone + Send + 'static,
244 ReadFinishedHook: Fn(usize) + Clone + Send + 'static,
245 SetNewValueHook: Fn(u8, u8, usize) + Clone + Send + 'static,
246 WriteSetFinishedHook: Fn(usize) + Clone + Send + 'static,
247{
248 let usable_timeout = timeout.unwrap_or(Duration::from_secs(MION_PARAMETER_TIMEOUT_SECONDS));
249 tcp_session_logging_hook(usable_timeout.as_millis());
251
252 let (got_parameters, stream) = tokio::select! {
253 res = get_parameters_without_timeout(
254 mion_addr,
255 parameter_port,
256 connection_established_logging_hook,
257 write_finished_hook,
258 read_finished_hook,
259 ) => { res }
260 () = sleep(usable_timeout) => {
261 Err(NetworkError::Timeout(usable_timeout))
262 }
263 }?;
264
265 let mut old_values_map = FnvHashMap::default();
266 let mut new_parameters = BytesMut::with_capacity(512);
267 new_parameters.extend_from_slice(got_parameters.get_raw_parameters());
268 for (location_spec, new_value) in parameters_to_set {
269 let location = match location_spec {
270 ParameterLocationSpecification::Index(idx) => usize::from(idx),
271 ParameterLocationSpecification::NameLike(name) => {
272 index_from_parameter_name(&name).ok_or(MIONParameterAPIError::NameNotKnown(name))?
273 }
274 };
275
276 let orig_value = got_parameters.get_raw_parameters()[location];
277 set_new_value_hook(orig_value, new_value, location);
278 old_values_map.insert(location, orig_value);
279 new_parameters[location] = new_value;
280 }
281
282 tokio::select! {
283 res = set_parameters_without_timeout(
284 new_parameters.freeze(),
285 stream,
286 write_set_finished_hook,
287 ) => { res.map(|success| (success, old_values_map)) }
288 () = sleep(usable_timeout) => {
289 Err(NetworkError::Timeout(usable_timeout).into())
290 }
291 }
292}
293
294async fn get_parameters_without_timeout<
295 ConnectionEstablishedHook,
296 WriteFinishedHook,
297 ReadFinishedHook,
298>(
299 mion_addr: Ipv4Addr,
300 parameter_port: Option<u16>,
301 connection_established_hook: ConnectionEstablishedHook,
302 write_finished_hook: WriteFinishedHook,
303 read_finished_hook: ReadFinishedHook,
304) -> Result<(DumpedMionParameters, TcpStream), NetworkError>
305where
306 ConnectionEstablishedHook: Fn(Ipv4Addr) + Clone + Send + 'static,
307 WriteFinishedHook: Fn(usize) + Clone + Send + 'static,
308 ReadFinishedHook: Fn(usize) + Clone + Send + 'static,
309{
310 let mut stream = TcpStream::connect((
311 mion_addr,
312 parameter_port.unwrap_or(DEFAULT_MION_PARAMETER_PORT),
313 ))
314 .await
315 .map_err(NetworkError::IO)?;
316 connection_established_hook(mion_addr);
317 stream.writable().await.map_err(NetworkError::IO)?;
318 stream
319 .write(&Bytes::from(MionDumpParameters::new()))
320 .await
321 .map_err(NetworkError::IO)?;
322
323 let expected_bytes_to_read = 520;
324 write_finished_hook(expected_bytes_to_read);
325
326 let mut resp_buff = BytesMut::with_capacity(expected_bytes_to_read);
327 let read_bytes = stream
328 .read_buf(&mut resp_buff)
329 .await
330 .map_err(NetworkError::IO)?;
331 read_finished_hook(read_bytes);
332 if read_bytes != expected_bytes_to_read {
333 return Err(NetworkParseError::NotEnoughData(
334 "DumpedMionParameters",
335 expected_bytes_to_read,
336 read_bytes,
337 resp_buff.freeze(),
338 )
339 .into());
340 }
341 let parameters = DumpedMionParameters::try_from(resp_buff.freeze())?;
342
343 Ok((parameters, stream))
344}
345
346async fn set_parameters_without_timeout<WriteFinishedHook>(
347 new_parameters: Bytes,
348 mut stream: TcpStream,
349 write_finished_hook: WriteFinishedHook,
350) -> Result<SetMionParametersResponse, CatBridgeError>
351where
352 WriteFinishedHook: Fn(usize) + Clone + Send + 'static,
353{
354 stream.writable().await.map_err(NetworkError::IO)?;
355 stream
356 .write(&Bytes::from(SetMionParameters::new(new_parameters)?))
357 .await
358 .map_err(NetworkError::IO)?;
359
360 let expected_bytes_to_read = 12;
361 write_finished_hook(expected_bytes_to_read);
362
363 let mut resp_buff = BytesMut::with_capacity(expected_bytes_to_read);
364 let read_bytes = stream
365 .read_buf(&mut resp_buff)
366 .await
367 .map_err(NetworkError::IO)?;
368 if read_bytes != expected_bytes_to_read {
369 return Err(NetworkParseError::NotEnoughData(
370 "SetMionParametersResponse",
371 expected_bytes_to_read,
372 read_bytes,
373 resp_buff.freeze(),
374 )
375 .into());
376 }
377 let response = SetMionParametersResponse::try_from(resp_buff.freeze())?;
378
379 Ok(response)
380}
381
382fn noop_tcp_session_made(_timeout: u128) {}
383
384fn noop_connection_established(_ip: Ipv4Addr) {}
385
386fn noop_write_finished(_expected_read: usize) {}
387
388fn noop_read_finished(_read_size: usize) {}
389
390fn noop_set_value_hook(_old_value: u8, _new_value: u8, _location: usize) {}