1use nom::{
2 branch::alt, bytes::complete::tag, character::complete::{alphanumeric1, anychar, space0, space1}, combinator::{map, peek, recognize}, error::{ErrorKind, ParseError}, multi::{many_till, separated_list1}, sequence::{pair, separated_pair, terminated}, Err, IResult, InputLength
3};
4
5use futures::{future::{AbortHandle, Abortable}, TryFutureExt};
6
7use scram;
8use url;
9use base64;
10use std::fmt;
11use std::str;
12
13use std::collections::HashMap;
14use std::str::FromStr;
15
16use std::sync::{Arc};
17use tokio::sync::{Mutex,Semaphore};
18
19use tokio::sync::mpsc;
20use anyhow::{anyhow, Result as AnyResult};
21
22pub mod ops;
23use ops::{FStr, HaystackOpTxRx, HaystackResponse};
24
25static BASE64_CONFIG: base64::Config = base64::URL_SAFE_NO_PAD;
26
27#[derive(Clone,Copy)]
28enum GridFormat {
29 Zinc,
30 Json,
31}
32
33#[derive(Debug)]
34pub enum Error<'a> {
35 RQW(reqwest::Error),
36 URI(url::ParseError),
37 FMT(fmt::Error),
38 MSG(&'a str),
39 HTTP(nom::Err<(String,nom::error::ErrorKind)>),
40 SCRAM(&'a str),
41 SCRAMState(scram::Error),
42 SCRAMDecode(base64::DecodeError),
43 SCRAMBytesToStr(std::str::Utf8Error),
44 HaystackErr,
45 PoisonedLock(&'a str)
46}
47
48pub struct HSession {
49 uri: url::Url,
50 grid_format: GridFormat,
51 username: String,
52 password: String,
53 auth_info: Arc<Mutex<Option<String>>>,
54 _http_client: reqwest::Client,
56}
57
58#[derive(Debug)]
59pub enum Grid {
60 Raw(String)
61}
62
63async fn new_hs_session<'a>(uri: String, username: String, password: String, accept_invalid_certs: bool, existing_session: Arc<Mutex<Option<String>>>, buffer: Option<usize>) -> Result<(AbortHandle,mpsc::Sender<HaystackOpTxRx>, Option<String>),Error<'a>> {
64 let (tx, mut rx) = mpsc::channel::<HaystackOpTxRx>(buffer.unwrap_or(10000));
65
66 let uri_member = url::Url::parse(&uri).map_err( |e| Error::URI(e))?;
67 let mut http_client = reqwest::Client::builder();
68
69 http_client = http_client
70 .danger_accept_invalid_certs(accept_invalid_certs);
71
72 let http_client = http_client
73 .build()
74 .unwrap();
75
76 let mut hs_session = HSession {
77 uri: uri_member,
78 grid_format: GridFormat::Zinc,
79 username: username,
80 password: password,
81 auth_info: existing_session,
82 _http_client: http_client,
84 };
85
86 let auth_token: Option<String>;
87 if !hs_session.is_authenticated().await {
88 auth_token = Some(hs_session._authenticate().await.unwrap());
89 } else {
90 auth_token = None;
91 }
92
93 let (abort_handle, abort_registration) = AbortHandle::new_pair();
94 let future = Abortable::new(async move {
95
96 let semaphore = Arc::new(Semaphore::new(1));
97 while let Some(op) = rx.recv().await {
98 let _permit = semaphore.clone().acquire_owned().await.unwrap();
99 if !hs_session.is_authenticated().await {
100 let _ = hs_session._authenticate().await.unwrap();
101 }
103 drop(_permit);
104
105 let ctx = HTTPContext {
106 client: hs_session._http_client.clone(),
107 auth_info: hs_session.auth_info.clone(),
108 uri: hs_session.uri.clone(),
109 grid_format: hs_session.grid_format.clone(),
110 };
111
112 tokio::spawn(async move {
113 match HSession::_request(ctx,op).await {
114 Ok((op,res)) => {
115 if op.resp_tx.is_closed() {
116 eprintln!("Sender for Response CLOSED. Won't be able to send");
119 }
120 if let Err(e) = op.resp_tx.send(HaystackResponse::Raw(res.to_string())) {
122 eprintln!("Handling failed requests to channel not supported! {}",e);
125 };
126 },
128 Err(e) => {
129 eprintln!("Handling failed requests not supported! {:?}",e);
132 }
133 }
134 });
135 }
136 }, abort_registration);
137
138 tokio::spawn(future);
139
140 Ok((abort_handle,tx, auth_token))
141}
142
143struct HTTPContext {
144 client: reqwest::Client,
145 auth_info: Arc<Mutex<Option<String>>>,
146 uri: url::Url,
147 grid_format: GridFormat,
148}
149
150impl <'a>HSession {
151 pub async fn new(uri: String, username: String, password: String, accept_invalid_certs: bool, existing_session: Arc<tokio::sync::Mutex<Option<String>>>, buffer: Option<usize>) -> Result<(AbortHandle,mpsc::Sender<HaystackOpTxRx>, Option<String>),Error<'a>> {
152 let mut url = url::Url::parse(&uri).map_err( |e| Error::URI(e))?;
153 if !url.path().ends_with('/') {
154 url.path_segments_mut()
155 .expect("Cannot modify URL path segments")
156 .push("");
157 }
158 new_hs_session(url.to_string(), username, password, accept_invalid_certs, existing_session, buffer).await
159 }
160
161 async fn _request(ctx: HTTPContext, haystack_op: HaystackOpTxRx) -> AnyResult<(HaystackOpTxRx,FStr<'a>)> {
162 let (method, op, body) = (haystack_op.priv_method(),haystack_op.priv_op(),haystack_op.priv_body());
163 let auth = {
166 let auth_clone = ctx.auth_info.clone().lock_owned().await;
167 auth_clone.to_owned().ok_or_else(|| anyhow!("No auth method in haystack session object"))?
168 };
169
170 let req = ctx.client
171 .request(reqwest::Method::from_str(method.as_str())
172 .map_err( |_| anyhow!("MSG: Invalid method"))?,ctx.uri.clone().join(op.as_str())
173 .map_err( |e| anyhow!("Error::URI: {:?}",e))?)
174 .header("Authorization", auth);
175
176 let req = match method.as_str() {
177 "PUT" | "POST" | "PATCH" => req.body(
178 reqwest::Body::from(
179 body.ok_or(anyhow!("Request body not provided for POST, PUT or PATCH request"))?.to_string()
180 )
181 ),
182 _ => req
183 };
184
185 let req = match ctx.grid_format {
186 GridFormat::Zinc => req.header("Content-Type","text/zinc"),
187 GridFormat::Json => req.header("Content-Type","application/json"),
188 };
189
190 let resp = req.send().await.map_err( |e| anyhow!("Error::RQW({})",e))?;
191
192 let res = resp.text().await.map_err( |e| anyhow!("Error::RQW({})",e))?;
193 Ok((haystack_op,res.into()))
194 }
195
196 async fn _authenticate(&mut self) -> AnyResult<String> {
197 let client = self._http_client.clone();
199
200 let mut uname_b64 = String::new();
201 fmt::write(&mut uname_b64,format_args!(
202 "HELLO username={}",
203 base64::encode_config(self.username.as_bytes(),BASE64_CONFIG))
204 )
205 .or_else(|e| Err(anyhow!("Auth Error: Unable to format HELLO msg: \"{:?}\"",e)))?;
206
207 let url = self.uri.clone().join("about").map_err( |e| anyhow!("{:?}",e))?;
209 let res = client.get(url)
210 .header("Authorization", uname_b64.as_str())
211 .send().await.or_else(|e| Err(anyhow!("Auth Error: {:?}",e)))?;
212
213 let www_auth_header = res.headers().get("www-authenticate")
214 .ok_or(anyhow!("Server response missing \"www-authenticate\" 1:\nHEADERS: {:?}", res.headers()))?;
215
216 let input = www_auth_header.to_str()
217 .map_err(|e| anyhow!("http::header::value::ToStrError: {:?}", e))?;
218
219 let (input,_): (&str, &str) = terminated(
220 alt((tag("SCRAM"),tag("scram"))),space1
221 )(input).map_err(|e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
222
223 let (input,www_auth_list) = separated_list1(
224 pair(tag(","),space0),
225 separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof))))))
226 )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
227
228 let www_auth_map: HashMap<_, _> = www_auth_list.into_iter().collect();
229
230 let state = scram::ScramClient::new(
231 self.username.as_str(),
232 self.password.as_str(),
233 None
234 );
235
236 let (state, state_first) = state.client_first();
237
238 if !www_auth_map.contains_key("hash") {
239 return Err(anyhow!("SCRAM: SHA-256 not supported"));
240 } else if let Some(hash) = www_auth_map.get("hash") {
241 if *hash != "SHA-256" {
242 return Err(anyhow!("SCRAM: SHA-256 not supported"));
243 }
244 }
245
246 let mut data = String::new();
247 if let Some(hs_token) = www_auth_map.get("handshakeToken") {
255 fmt::write(&mut data,format_args!(
256 "SCRAM handshakeToken={}, data={}",
257 hs_token,
258 base64::encode_config(state_first.as_bytes(),BASE64_CONFIG))
259 ).map_err( |e| anyhow!("{:?}",e))?;
260 } else {
261 fmt::write(&mut data,format_args!(
262 "SCRAM data={}",
263 base64::encode_config(state_first.as_bytes(),BASE64_CONFIG))
264 ).map_err( |e| anyhow!("{:?}",e))?;
265 }
266
267 let res = client.get(self.uri.clone().join("about").map_err( |e| anyhow!("{:?}",e))?)
268 .header("Authorization", data.as_str())
269 .send().await.map_err( |e| anyhow!("{:?}",e))?;
270
271 let www_auth_header = res.headers().get("www-authenticate")
272 .ok_or(anyhow!("Server response missing \"www-authenticate\" 2:\nHEADERS: {:?}", res.headers()))?;
273
274 let input = www_auth_header.to_str().unwrap();
275
276 let (input,_): (&str, &str) = terminated(
277 alt((tag("SCRAM"),tag("scram"))),space1
278 )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
279
280 let (input,www_auth_list) = separated_list1(
281 pair(tag(","),space0),
282 separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof))))))
283 )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("{:?}",e))?;
284
285 let www_auth_map: HashMap<_, &str> = www_auth_list.into_iter().collect();
286
287 let data_temp = www_auth_map.get("data").ok_or(anyhow!("\"data\" missing from server response"))?;
288
289 let data_temp_2 = base64::decode_config(
290 str::from_utf8(data_temp.as_bytes()).map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?,
291 BASE64_CONFIG
292 ).map_err( |e| anyhow!("SCRAM decode: {:?}, payload: {:02X?}",e,data_temp.as_bytes()))?;
293
294 let data_temp_1 = str::from_utf8(&data_temp_2)
295 .map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?;
296
297 let state = state.handle_server_first(data_temp_1)
298 .map_err( |e| anyhow!("SCRAM state: {:?}",e))?;
299
300 let (state, client_final) = state.client_final();
301
302 let mut data = String::new();
303 if let Some(hs_token) = www_auth_map.get("handshakeToken") {
311 fmt::write(&mut data,format_args!(
312 "SCRAM handshakeToken={}, data={}",
313 hs_token,
314 base64::encode_config(client_final.as_bytes(),BASE64_CONFIG))
315 ).map_err( |e| anyhow!("Format: {:?}",e))?;
316 } else {
317 fmt::write(&mut data,format_args!(
318 "SCRAM data={}",
319 base64::encode_config(client_final.as_bytes(),BASE64_CONFIG))
320 ).map_err( |e| anyhow!("Format: {:?}",e))?;
321 }
322
323 let res = client.get(self.uri.clone().join("about").map_err( |e| anyhow!("URI: {:?}",e))?)
324 .header("Authorization", data.as_str())
325 .send().await.map_err( |e| anyhow!("RQW: {:?}",e))?;
326
327 let authentication_info = (&res.headers()).get("authentication-info")
328 .ok_or(anyhow!("Server response missing \"authentication-info\": HEADERS: {:?}\nSTATUS: {:?}", res.headers(), res.status()))?;
329
330 let input = authentication_info.to_str().unwrap();
331
332 let (input,authentication_info_list) = separated_list1(
333 pair(tag(","),space0),
334 map(
335 separated_pair(alphanumeric1,tag("="),recognize(many_till(anychar,peek(alt((tag(","),eof)))))),
336 |(a,b):(&str,&str)| (a.to_owned(),b.to_owned())
337 )
338 )(input).map_err( |e: nom::Err<(&str,nom::error::ErrorKind)>| anyhow!("HTTP: {:?}",e))?;
339
340 let authentication_info_map: HashMap<String, String> = authentication_info_list.into_iter().collect();
341
342 let data_temp = authentication_info_map.get("data").ok_or(anyhow!("\"data\" missing from server response"))?;
343
344 let data_temp_2 = base64::decode_config(
345 str::from_utf8(data_temp.as_bytes()).map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?,
346 BASE64_CONFIG
347 ).map_err( |e| anyhow!("SCRAM decode: {:?}, payload: {:02X?}",e,data_temp.as_bytes()))?;
348
349 let data_temp_1 = str::from_utf8(&data_temp_2)
350 .map_err( |e| anyhow!("SCRAM bytes to str: {:?}",e))?;
351
352 let () = state.handle_server_final(data_temp_1)
353 .map_err( |e| anyhow!("SCRAM state: {:?}",e))?;
354
355 let mut bearer_string = String::new();
356 fmt::write(&mut bearer_string,format_args!(
357 "BEARER authToken={}",
358 authentication_info_map.get("authToken")
359 .ok_or(anyhow!("\"authToken\" missing from server response"))?
360 )).map_err( |e| anyhow!("Format: {:?}",e))?;
361
362 *self.auth_info.clone().lock_owned().await = Some(bearer_string.clone());
363 Ok(bearer_string)
364 }
365
366 pub async fn is_authenticated(&self) -> bool {
367 self.auth_info.lock().await.is_some()
368 }
369}
370
371pub fn eof<I: InputLength + Copy, E: ParseError<I>>(input: I) -> IResult<I, I, E> {
372 if input.input_len() == 0 {
373 Ok((input, input))
374 } else {
375 Err(nom::Err::Error(E::from_error_kind(input, ErrorKind::Eof)))
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use rstest::*;
382 use futures::future;
383 use std::ops::{Deref, DerefMut};
384 use super::*;
385
386 #[fixture]
404 async fn client() -> Box<mpsc::Sender<ops::HaystackOpTxRx>> {
405 let hs_session: (AbortHandle, mpsc::Sender<ops::HaystackOpTxRx>, Option<String>) = HSession::new(
406 "https://analytics.host.docker.internal/api/demo/".to_owned(),
407 "su".to_owned(),
408 "password".to_owned(),
409 true,
410 Arc::new(Mutex::new(None)),
411 None
412 ).await.unwrap();
413 Box::new(hs_session.1)
414 }
415
416 #[rstest]
417 #[tokio::test]
418 async fn about<D,F>(client: F)
419 where F: std::future::Future<Output = D>,
420 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
421 let (op,resp) = HaystackOpTxRx::about();
422
423 let client_res = client.await;
424 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
425 let res = permit.send(op);
426
427 if let Err(e) = resp.await {
428 panic!("Failed to receive response: {}",e);
429 }
430 }
431
432 #[rstest]
433 #[tokio::test]
434 async fn ops<D, F>(client: F)
435 where F: std::future::Future<Output = D>,
436 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
437 let (op,resp) = HaystackOpTxRx::ops(None, None).unwrap();
438
439 let client_res = client.await;
440 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
441 let res = permit.send(op);
442
443 if let Err(e) = resp.await {
444 panic!("Failed to receive response: {}",e);
445 }
446 }
447
448 #[rstest]
449 #[tokio::test]
450 async fn filetypes<D, F>(client: F)
451 where F: std::future::Future<Output = D>,
452 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
453 let (op,resp) = HaystackOpTxRx::filetypes(None,None).unwrap();
454
455 let client_res = client.await;
456 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
457 let res = permit.send(op);
458
459 if let Err(e) = resp.await {
460 panic!("Failed to receive response: {}",e);
461 }
462 }
463
464 #[rstest]
465 #[tokio::test]
466 async fn read<D, F>(client: F)
467 where F: std::future::Future<Output = D>,
468 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
469 let (op,resp) = HaystackOpTxRx::read("point and his and temp".into(), Some(10)).unwrap();
470
471 let client_res = client.await;
472 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
473 let res = permit.send(op);
474
475 if let Err(e) = resp.await {
476 panic!("Failed to receive response: {}",e);
477 }
478 }
479
480 #[rstest]
481 #[tokio::test]
482 async fn nav_root<D, F>(client: F)
483 where F: std::future::Future<Output = D>,
484 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
485 let (op,resp) = HaystackOpTxRx::nav(None).unwrap();
486
487 let client_res = client.await;
488 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
489 let res = permit.send(op);
490
491 if let Err(e) = resp.await {
492 panic!("Failed to receive response: {}",e);
493 }
494 }
495
496 #[rstest]
497 #[tokio::test]
498 async fn nav_site<D, F>(client: F)
499 where F: std::future::Future<Output = D>,
500 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
501 let (op,resp) = HaystackOpTxRx::nav(Some("`equip:/Carytown`")).unwrap();
502
503 let client_res = client.await;
504 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
505 let res = permit.send(op);
506
507 if let Err(e) = resp.await {
508 panic!("Failed to receive response: {}",e);
509 }
510 }
511
512 #[rstest]
533 #[tokio::test]
534 async fn his_read<D, F>(client: F)
535 where F: std::future::Future<Output = D>,
536 D: DerefMut<Target = mpsc::Sender<ops::HaystackOpTxRx>> {
537 let (op,resp) = HaystackOpTxRx::his_read("@p:demo:r:26464231-bea9f430","2019-01-01").unwrap();
538
539 let client_res = client.await;
540 let permit = client_res.reserve().await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
541 let res = permit.send(op);
542
543 let response = resp.await.or_else(|e| Err(anyhow!("Failed to reserve permit: {}",e))).unwrap();
544 }
545
546 #[tokio::test]
548 async fn spawn_multiple_tasks_in_new_session() {
549 use futures::join;
550 let (abort_handle,addr, _) = HSession::new(
551 "https://analytics.host.docker.internal/api/demo/".to_owned(),
552 "su".to_owned(),
553 "password".to_owned(),
554 true,
555 Arc::new(Mutex::new(None)),
556 None
557 ).await.unwrap();
558
559 let (nav_op,nav_resp) = HaystackOpTxRx::nav(None).unwrap();
560 let (formats_op,formats_resp) = HaystackOpTxRx::filetypes(None, None).unwrap();
561 let (about_op,about_resp) = HaystackOpTxRx::about();
562
563 let mut nav_addr = addr.clone();
564 let mut formats_addr = addr.clone();
565 let mut about_addr = addr.clone();
566
567 let (nav_res,formats_res,about_res) = join!(
568 nav_addr.send(nav_op),
569 formats_addr.send(formats_op),
570 about_addr.send(about_op),
571 );
572
573 if nav_res.is_err() || formats_res.is_err() || about_res.is_err() {
574 panic!("One or more requests failed 1")
575 }
576
577 let (nav_res,formats_res,about_res) = join!(nav_resp, formats_resp, about_resp);
578
579 if nav_res.is_err() || formats_res.is_err() || about_res.is_err() {
580 panic!("One or more requests failed 2")
581 }
582
583 abort_handle.abort()
584 }
585}