haystackclientlib/
lib.rs

1use nom::{
2    branch::alt, bytes::complete::tag, character::complete::{alphanumeric1, anychar, space0, space1}, combinator::{map, peek, recognize}, error::{ErrorKind, ParseError}, multi::{many_till, separated_list1}, sequence::{pair, separated_pair, terminated}, Err, IResult, InputLength
3};
4
5use futures::{future::{AbortHandle, Abortable}, TryFutureExt};
6
7use scram;
8use url;
9use base64;
10use std::fmt;
11use std::str;
12
13use std::collections::HashMap;
14use std::str::FromStr;
15
16use std::sync::{Arc};
17use tokio::sync::{Mutex,Semaphore};
18
19use tokio::sync::mpsc;
20use anyhow::{anyhow, Result as AnyResult};
21
22pub mod ops;
23use ops::{FStr, HaystackOpTxRx, HaystackResponse};
24
25static BASE64_CONFIG: base64::Config = base64::URL_SAFE_NO_PAD;
26
27#[derive(Clone,Copy)]
28enum GridFormat {
29    Zinc,
30    Json,
31}
32
33#[derive(Debug)]
34pub enum Error<'a> {
35    RQW(reqwest::Error),
36    URI(url::ParseError),
37    FMT(fmt::Error),
38    MSG(&'a str),
39    HTTP(nom::Err<(String,nom::error::ErrorKind)>),
40    SCRAM(&'a str),
41    SCRAMState(scram::Error),
42    SCRAMDecode(base64::DecodeError),
43    SCRAMBytesToStr(std::str::Utf8Error),
44    HaystackErr,
45    PoisonedLock(&'a str)
46}
47
48pub struct HSession {
49    uri: url::Url,
50    grid_format: GridFormat,
51    username: String,
52    password: String,
53    auth_info: Arc<Mutex<Option<String>>>,
54    // semaphore: Arc<Semaphore>,
55    _http_client: reqwest::Client,
56}
57
58#[derive(Debug)]
59pub enum Grid {
60    Raw(String)
61}
62
63async fn new_hs_session<'a>(uri: String, username: String, password: String, accept_invalid_certs: bool, existing_session: Arc<Mutex<Option<String>>>, buffer: Option<usize>) -> Result<(AbortHandle,mpsc::Sender<HaystackOpTxRx>, Option<String>),Error<'a>> {
64    let (tx, mut rx) = mpsc::channel::<HaystackOpTxRx>(buffer.unwrap_or(10000));
65
66    let uri_member = url::Url::parse(&uri).map_err( |e| Error::URI(e))?;
67    let mut http_client = reqwest::Client::builder();
68    
69    http_client = http_client
70        .danger_accept_invalid_certs(accept_invalid_certs);
71    
72    let http_client = http_client
73        .build()
74        .unwrap();
75    
76    let mut hs_session = HSession {
77        uri: uri_member,
78        grid_format: GridFormat::Zinc,
79        username: username,
80        password: password,
81        auth_info: existing_session,
82        // semaphore: Arc::new(Semaphore::new(1)),
83        _http_client: http_client,
84    };
85
86    let auth_token: Option<String>;
87    if !hs_session.is_authenticated().await {
88        auth_token = Some(hs_session._authenticate().await.unwrap());
89    } else {
90        auth_token = None;
91    }
92
93    let (abort_handle, abort_registration) = AbortHandle::new_pair();
94    let future = Abortable::new(async move {
95
96        let semaphore = Arc::new(Semaphore::new(1));
97        while let Some(op) = rx.recv().await {
98            let _permit = semaphore.clone().acquire_owned().await.unwrap();
99            if !hs_session.is_authenticated().await {
100                let _ = hs_session._authenticate().await.unwrap();
101                // drop(_permit);
102            }
103            drop(_permit);
104
105            let ctx = HTTPContext {
106                client: hs_session._http_client.clone(),
107                auth_info: hs_session.auth_info.clone(),
108                uri: hs_session.uri.clone(),
109                grid_format: hs_session.grid_format.clone(),
110            };
111
112            tokio::spawn(async move {
113                match HSession::_request(ctx,op).await {
114                    Ok((op,res)) => {
115                        if op.resp_tx.is_closed() {
116                            //panic!("Sender for Response CLOSED. Won't be able to send");
117                            //return Err(anyhow!("Sender for Response CLOSED. Won't be able to send"));
118                            eprintln!("Sender for Response CLOSED. Won't be able to send");
119                        }
120                        // let sent_resp_res = op.resp_tx.send(HaystackResponse::Raw(res.to_string()));
121                        if let Err(e) = op.resp_tx.send(HaystackResponse::Raw(res.to_string())) {
122                            //panic!("Handling failed requests to channel not supported!");
123                            //return Err(anyhow!("Handling failed requests to channel not supported! {}",e));
124                            eprintln!("Handling failed requests to channel not supported! {}",e);
125                        };
126                        //Ok(())
127                    },
128                    Err(e) => {
129                        //panic!("Handling failed requests not supported!");
130                        //Err(anyhow!("Handling failed requests not supported! {:?}",e))
131                        eprintln!("Handling failed requests not supported! {:?}",e);
132                    }
133                }
134            });
135        }
136    }, abort_registration);
137
138    tokio::spawn(future);
139
140    Ok((abort_handle,tx, auth_token))
141}
142
143struct HTTPContext {
144    client: reqwest::Client,
145    auth_info: Arc<Mutex<Option<String>>>,
146    uri: url::Url,
147    grid_format: GridFormat,
148}
149
150impl <'a>HSession {
151    pub async fn new(uri: String, username: String, password: String, accept_invalid_certs: bool, existing_session: Arc<tokio::sync::Mutex<Option<String>>>, buffer: Option<usize>) -> Result<(AbortHandle,mpsc::Sender<HaystackOpTxRx>, Option<String>),Error<'a>> {
152        let mut url = url::Url::parse(&uri).map_err( |e| Error::URI(e))?;
153        if !url.path().ends_with('/') {
154            url.path_segments_mut()
155                .expect("Cannot modify URL path segments")
156                .push("");
157        }
158        new_hs_session(url.to_string(), username, password, accept_invalid_certs, existing_session, buffer).await
159    }
160
161    async fn _request(ctx: HTTPContext, haystack_op: HaystackOpTxRx) -> AnyResult<(HaystackOpTxRx,FStr<'a>)> {
162        let (method, op, body) = (haystack_op.priv_method(),haystack_op.priv_op(),haystack_op.priv_body());
163        //let auth_clone = ctx.auth_info.clone().lock_owned().await;
164        //let auth: String = auth_clone.to_owned().expect("No auth method in haystack session object");
165        let auth = {
166            let auth_clone = ctx.auth_info.clone().lock_owned().await;
167            auth_clone.to_owned().ok_or_else(|| anyhow!("No auth method in haystack session object"))?
168        };
169        
170        let req = ctx.client
171            .request(reqwest::Method::from_str(method.as_str())
172                            .map_err( |_| anyhow!("MSG: Invalid method"))?,ctx.uri.clone().join(op.as_str())
173                            .map_err( |e| anyhow!("Error::URI: {:?}",e))?)
174            .header("Authorization", auth);
175
176        let req = match method.as_str() {
177            "PUT" | "POST" | "PATCH" => req.body(
178                reqwest::Body::from(
179                body.ok_or(anyhow!("Request body not provided for POST, PUT or PATCH request"))?.to_string()
180                )
181            ),
182            _ => req
183        };
184
185        let req = match ctx.grid_format {
186            GridFormat::Zinc => req.header("Content-Type","text/zinc"),
187            GridFormat::Json => req.header("Content-Type","application/json"),
188        };
189
190        let resp = req.send().await.map_err( |e| anyhow!("Error::RQW({})",e))?;
191
192        let res = resp.text().await.map_err( |e| anyhow!("Error::RQW({})",e))?;
193        Ok((haystack_op,res.into()))
194    }
195
196    async fn _authenticate(&mut self) -> AnyResult<String> {
197        // let client = reqwest::Client::new();
198        let client = self._http_client.clone();
199
200        let mut uname_b64 = String::new();
201        fmt::write(&mut uname_b64,format_args!(
202            "HELLO username={}",
203            base64::encode_config(self.username.as_bytes(),BASE64_CONFIG))
204        )
205        .or_else(|e| Err(anyhow!("Auth Error: Unable to format HELLO msg: \"{:?}\"",e)))?;
206
207        //let res = client.get(self.uri.clone().join("about").map_err( |e| Error::URI(e))?)
208        let url = self.uri.clone().join("about").map_err( |e| anyhow!("{:?}",e))?;
209        let res = client.get(url)
210            .header("Authorization", uname_b64.as_str())
211            .send().await.or_else(|e| Err(anyhow!("Auth Error: {:?}",e)))?;
212
213        let www_auth_header = res.headers().get("www-authenticate")
214            .ok_or(anyhow!("Server response missing \"www-authenticate\" 1:\nHEADERS: {:?}", res.headers()))?;
215
216        let input = www_auth_header.to_str()
217            .map_err(|e| anyhow!("http::header::value::ToStrError: {:?}", e))?;
218
219        let (input,_): (&str, &str) = terminated(
220            alt((tag("SCRAM"),tag("scram"))),space1
221        )(input).map_err(|e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
222
223        let (input,www_auth_list) = separated_list1(
224            pair(tag(","),space0),
225            separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof))))))
226        )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
227
228        let www_auth_map: HashMap<_, _> = www_auth_list.into_iter().collect();
229
230        let state = scram::ScramClient::new(
231            self.username.as_str(),
232            self.password.as_str(),
233            None
234        );
235
236        let (state, state_first) = state.client_first();
237
238        if !www_auth_map.contains_key("hash") {
239            return Err(anyhow!("SCRAM: SHA-256 not supported"));
240        } else if let Some(hash) = www_auth_map.get("hash") {
241            if *hash != "SHA-256" {
242                return Err(anyhow!("SCRAM: SHA-256 not supported"));
243            }
244        }
245
246        let mut data = String::new();
247        // TODO: Remove commented lines
248        // fmt::write(&mut data,format_args!(
249        //     "SCRAM handshakeToken={}, data={}",
250        //     www_auth_map.get("handshakeToken").ok_or(Error::MSG("\"handshakeToken\" missing from server response"))?,
251        //     base64::encode_config(state_first.as_bytes(),BASE64_CONFIG))
252        // ).map_err( |e| Error::FMT(e))?;
253
254        if let Some(hs_token) = www_auth_map.get("handshakeToken") {
255            fmt::write(&mut data,format_args!(
256                "SCRAM handshakeToken={}, data={}",
257                hs_token,
258                base64::encode_config(state_first.as_bytes(),BASE64_CONFIG))
259            ).map_err( |e| anyhow!("{:?}",e))?;
260        } else {
261            fmt::write(&mut data,format_args!(
262                "SCRAM data={}",
263                base64::encode_config(state_first.as_bytes(),BASE64_CONFIG))
264            ).map_err( |e| anyhow!("{:?}",e))?;
265        }
266
267        let res = client.get(self.uri.clone().join("about").map_err( |e| anyhow!("{:?}",e))?)
268            .header("Authorization", data.as_str())
269            .send().await.map_err( |e| anyhow!("{:?}",e))?;
270
271        let www_auth_header = res.headers().get("www-authenticate")
272            .ok_or(anyhow!("Server response missing \"www-authenticate\" 2:\nHEADERS: {:?}", res.headers()))?;
273
274        let input = www_auth_header.to_str().unwrap();
275
276        let (input,_): (&str, &str) = terminated(
277            alt((tag("SCRAM"),tag("scram"))),space1
278        )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
279
280        let (input,www_auth_list) = separated_list1(
281            pair(tag(","),space0),
282            separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof))))))
283        )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
284
285        let www_auth_map: HashMap<_, &str> = www_auth_list.into_iter().collect();
286
287        let data_temp = www_auth_map.get("data").ok_or(anyhow!("\"data\" missing from server response"))?;
288
289        let data_temp_2 = base64::decode_config(
290            str::from_utf8(data_temp.as_bytes()).map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?,
291            BASE64_CONFIG
292        ).map_err( |e| anyhow!("SCRAM decode: {:?}, payload: {:02X?}",e,data_temp.as_bytes()))?;
293
294        let data_temp_1 = str::from_utf8(&data_temp_2)
295            .map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?;
296
297        let state = state.handle_server_first(data_temp_1)
298            .map_err( |e| anyhow!("SCRAM state: {:?}",e))?;
299
300        let (state, client_final) = state.client_final();
301
302        let mut data = String::new();
303        // TODO: Remove commented lines
304        // fmt::write(&mut data,format_args!(
305        //     "SCRAM handshakeToken={}, data={}",
306        //     www_auth_map.get("handshakeToken").ok_or(Error::MSG("\"handshakeToken\" missing from server response"))?,
307        //     base64::encode_config(client_final.as_bytes(),BASE64_CONFIG))
308        // ).map_err( |e| Error::FMT(e))?;
309
310        if let Some(hs_token) = www_auth_map.get("handshakeToken") {
311            fmt::write(&mut data,format_args!(
312                "SCRAM handshakeToken={}, data={}",
313                hs_token,
314                base64::encode_config(client_final.as_bytes(),BASE64_CONFIG))
315            ).map_err( |e| anyhow!("Format: {:?}",e))?;
316        } else {
317            fmt::write(&mut data,format_args!(
318                "SCRAM data={}",
319                base64::encode_config(client_final.as_bytes(),BASE64_CONFIG))
320            ).map_err( |e| anyhow!("Format: {:?}",e))?;
321        }
322
323        let res = client.get(self.uri.clone().join("about").map_err( |e| anyhow!("URI: {:?}",e))?)
324            .header("Authorization", data.as_str())
325            .send().await.map_err( |e| anyhow!("RQW: {:?}",e))?;
326
327        let authentication_info = (&res.headers()).get("authentication-info")
328            .ok_or(anyhow!("Server response missing \"authentication-info\": HEADERS: {:?}\nSTATUS: {:?}", res.headers(), res.status()))?;
329
330        let input = authentication_info.to_str().unwrap();
331
332        let (input,authentication_info_list) = separated_list1(
333            pair(tag(","),space0),
334            map(
335                separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof)))))),
336                |(a,b):(&str,&str)| (a.to_owned(),b.to_owned())
337            )
338        )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("HTTP: {:?}",e))?;
339
340        let authentication_info_map: HashMap<String, String> = authentication_info_list.into_iter().collect();
341
342        let data_temp = authentication_info_map.get("data").ok_or(anyhow!("\"data\" missing from server response"))?;
343
344        let data_temp_2 = base64::decode_config(
345            str::from_utf8(data_temp.as_bytes()).map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?,
346            BASE64_CONFIG
347        ).map_err( |e| anyhow!("SCRAM decode: {:?}, payload: {:02X?}",e,data_temp.as_bytes()))?;
348
349        let data_temp_1 = str::from_utf8(&data_temp_2)
350            .map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?;
351
352        let () = state.handle_server_final(data_temp_1)
353            .map_err( |e| anyhow!("SCRAM state: {:?}",e))?;
354
355        let mut bearer_string = String::new();
356        fmt::write(&mut bearer_string,format_args!(
357            "BEARER authToken={}",
358            authentication_info_map.get("authToken")
359            .ok_or(anyhow!("\"authToken\" missing from server response"))?
360        )).map_err( |e| anyhow!("Format: {:?}",e))?;
361
362        *self.auth_info.clone().lock_owned().await = Some(bearer_string.clone());
363        Ok(bearer_string)
364    }
365
366    pub async fn is_authenticated(&self) -> bool {
367        self.auth_info.lock().await.is_some()
368    }
369}
370
371pub fn eof<I: InputLength + Copy, E: ParseError<I>>(input: I) -> IResult<I, I, E> {
372    if input.input_len() == 0 {
373      Ok((input, input))
374    } else {
375      Err(nom::Err::Error(E::from_error_kind(input, ErrorKind::Eof)))
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use rstest::*;
382    use futures::future;
383    use std::ops::{Deref, DerefMut};
384    use super::*;
385
386    // #[fixture]
387    // // TODO: Write test with close op that closes original session
388    // fn client() -> future::Ready<Box<mpsc::Sender<ops::HaystackOpTxRx>>> {
389    //     lazy_static! {
390    //         static ref HS_SESSION: (AbortHandle,mpsc::Sender<ops::HaystackOpTxRx>) = HSession::new(
391    //             "https://analytics.host.docker.internal/api/demo/".to_owned(),
392    //             "su".to_owned(),
393    //             "password".to_owned(),
394    //             true,
395    //             Arc::new(Mutex::new(None)),
396    //             None
397    //         ).unwrap();
398    //     }
399    //     future::ready::<Box<mpsc::Sender<ops::HaystackOpTxRx>>>(Box::new(HS_SESSION.1.clone()))
400    // }
401
402    // TODO: Write test with close op that closes original session
403    #[fixture]
404    async fn client() -> Box<mpsc::Sender<ops::HaystackOpTxRx>> {
405        let hs_session: (AbortHandle, mpsc::Sender<ops::HaystackOpTxRx>, Option<String>) = HSession::new(
406                "https://analytics.host.docker.internal/api/demo/".to_owned(),
407                "su".to_owned(),
408                "password".to_owned(),
409                true,
410                Arc::new(Mutex::new(None)),
411                None
412            ).await.unwrap();
413        Box::new(hs_session.1)
414    }
415
416    #[rstest]
417    #[tokio::test]
418    async fn about<D,F>(client: F)
419        where F: std::future::Future<Output = D>,
420            D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
421        let (op,resp) = HaystackOpTxRx::about();
422
423        let client_res = client.await;
424        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
425        let res = permit.send(op);
426
427        if let Err(e) = resp.await {
428            panic!("Failed to receive response: {}",e);
429        }
430    }
431
432    #[rstest]
433    #[tokio::test]
434    async fn ops<D, F>(client: F)
435    	where F: std::future::Future<Output = D>,
436        	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
437        let (op,resp) = HaystackOpTxRx::ops(None, None).unwrap();
438
439        let client_res = client.await;
440        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
441        let res = permit.send(op);
442
443        if let Err(e) = resp.await {
444            panic!("Failed to receive response: {}",e);
445        }
446    }
447
448    #[rstest]
449    #[tokio::test]
450    async fn filetypes<D, F>(client: F)
451   		where F: std::future::Future<Output = D>,
452        	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
453        let (op,resp) = HaystackOpTxRx::filetypes(None,None).unwrap();
454
455        let client_res = client.await;
456        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
457        let res = permit.send(op);
458
459        if let Err(e) = resp.await {
460            panic!("Failed to receive response: {}",e);
461        }
462    }
463
464    #[rstest]
465    #[tokio::test]
466    async fn read<D, F>(client: F)
467    	where F: std::future::Future<Output = D>,
468        	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
469        let (op,resp) = HaystackOpTxRx::read("point and his and temp".into(), Some(10)).unwrap();
470
471        let client_res = client.await;
472        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
473        let res = permit.send(op);
474
475        if let Err(e) = resp.await {
476            panic!("Failed to receive response: {}",e);
477        }
478    }
479
480    #[rstest]
481    #[tokio::test]
482    async fn nav_root<D, F>(client: F)
483    	where F: std::future::Future<Output = D>,
484        	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
485        let (op,resp) = HaystackOpTxRx::nav(None).unwrap();
486
487        let client_res = client.await;
488        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
489        let res = permit.send(op);
490
491        if let Err(e) = resp.await {
492            panic!("Failed to receive response: {}",e);
493        }
494    }
495
496    #[rstest]
497    #[tokio::test]
498    async fn nav_site<D, F>(client: F)
499    	where F: std::future::Future<Output = D>,
500       		D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
501        let (op,resp) = HaystackOpTxRx::nav(Some("`equip:/Carytown`")).unwrap();
502
503        let client_res = client.await;
504        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
505        let res = permit.send(op);
506
507        if let Err(e) = resp.await {
508            panic!("Failed to receive response: {}",e);
509        }
510    }
511
512    // #[rstest]
513    // #[tokio::test]
514    // async fn reuse_with_multi_op<D, F>(client: F)
515    //     where F: std::future::Future<Output = D>,
516    //     	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
517    //     let (op,resp) = HaystackOpTxRx::nav(Some("`equip:/Carytown`")).unwrap();
518    //     let client_res = client.clone().await;
519    //     let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
520    //     let res = permit.send(op);
521
522    //     let _response = resp.await.unwrap();
523
524    //     let (op,resp) = HaystackOpTxRx::about();
525    //     let client_res = client.clone().await;
526    //     let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
527    //     let res = permit.send(op);
528
529    //     let response = resp.await.unwrap();
530    // }
531
532    #[rstest]
533    #[tokio::test]
534    async fn his_read<D, F>(client: F)
535    	where F: std::future::Future<Output = D>,
536        	D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
537        let (op,resp) = HaystackOpTxRx::his_read("@p:demo:r:26464231-bea9f430","2019-01-01").unwrap();
538
539        let client_res = client.await;
540        let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
541        let res = permit.send(op);
542
543        let response = resp.await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
544    }
545
546    // TODO: Close session
547    #[tokio::test]
548    async fn spawn_multiple_tasks_in_new_session() {
549        use futures::join;
550        let (abort_handle,addr, _) = HSession::new(
551            "https://analytics.host.docker.internal/api/demo/".to_owned(),
552            "su".to_owned(),
553            "password".to_owned(),
554            true,
555            Arc::new(Mutex::new(None)),
556            None
557        ).await.unwrap();
558
559        let (nav_op,nav_resp) = HaystackOpTxRx::nav(None).unwrap();
560        let (formats_op,formats_resp) = HaystackOpTxRx::filetypes(None, None).unwrap();
561        let (about_op,about_resp) = HaystackOpTxRx::about();
562
563        let mut nav_addr = addr.clone();
564        let mut formats_addr = addr.clone();
565        let mut about_addr = addr.clone();
566
567        let (nav_res,formats_res,about_res) = join!(
568            nav_addr.send(nav_op),
569            formats_addr.send(formats_op),
570            about_addr.send(about_op),
571        );
572
573        if nav_res.is_err() || formats_res.is_err() || about_res.is_err() {
574            panic!("One or more requests failed 1")
575        }
576
577        let (nav_res,formats_res,about_res) = join!(nav_resp, formats_resp, about_resp);
578
579        if nav_res.is_err() || formats_res.is_err() || about_res.is_err() {
580            panic!("One or more requests failed 2")
581        }
582
583        abort_handle.abort()
584    }
585}