1#![warn(missing_docs)]
2pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30use std::fmt::{Display,Formatter};
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62const VERSION: &str = "0.0.35";
63
64
65#[derive(Clone)]
68pub struct Client {
69 connect_called: Arc<std::sync::Mutex<bool>>,
71 runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74 stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77 task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78 pub client: Arc<std::sync::Mutex<ClientEnum>>,
80 user: Arc<std::sync::Mutex<Option<User>>>,
82 pub inner: Arc<Mutex<ClientInner>>,
84 pub config: Arc<std::sync::Mutex<Option<Config>>>,
86 pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88 pub url: Arc<std::sync::Mutex<String>>,
90 pub username: Arc<std::sync::Mutex<String>>,
92 pub password: Arc<std::sync::Mutex<String>>,
94 pub jwt: Arc<std::sync::Mutex<String>>,
96 service_name: Arc<std::sync::Mutex<String>>,
97 agent_name: Arc<std::sync::Mutex<String>>,
98 agent_version: Arc<std::sync::Mutex<String>>,
99 event_sender: async_channel::Sender<ClientEvent>,
100 event_receiver: async_channel::Receiver<ClientEvent>,
101 out_envelope_sender: async_channel::Sender<Envelope>,
102 out_envelope_receiver: async_channel::Receiver<Envelope>,
103 rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104 rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106 pub state: Arc<std::sync::Mutex<ClientState>>,
108 pub msgcount: Arc<std::sync::Mutex<i32>>,
110 pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112 pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115#[derive(Clone, Default)]
117pub struct ClientStatistics {
118 connection_attempts: u64,
119 connections: u64,
120 package_tx: u64,
121 package_rx: u64,
122 signin: u64,
123 download: u64,
124 getdocumentversion: u64,
125 customcommand: u64,
126 listcollections: u64,
127 createcollection: u64,
128 dropcollection: u64,
129 ensurecustomer: u64,
130 invokeopenrpa: u64,
131 registerqueue: u64,
132 registerexchange: u64,
133 unregisterqueue: u64,
134 watch: u64,
135 unwatch: u64,
136 queuemessage: u64,
137 pushworkitem: u64,
138 pushworkitems: u64,
139 popworkitem: u64,
140 updateworkitem: u64,
141 deleteworkitem: u64,
142 addworkitemqueue: u64,
143 updateworkitemqueue: u64,
144 deleteworkitemqueue: u64,
145 getindexes: u64,
146 createindex: u64,
147 dropindex: u64,
148 upload: u64,
149 query: u64,
150 count: u64,
151 distinct: u64,
152 aggregate: u64,
153 insertone: u64,
154 insertmany: u64,
155 insertorupdateone: u64,
156 insertorupdatemany: u64,
157 updateone: u64,
158 updatedocument: u64,
159 deleteone: u64,
160 deletemany: u64,
161}
162use futures::future::BoxFuture;
164type QueueCallbackFn =
165 Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167#[derive(Clone)]
170pub struct ClientInner {
171 pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173 pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175 #[allow(clippy::type_complexity)]
177 pub watches:
178 Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179 #[allow(clippy::type_complexity)]
181 pub queues:
182 Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184#[derive(Clone, Debug)]
186pub enum ClientEnum {
187 None,
189 Grpc(FlowServiceClient<tonic::transport::Channel>),
191 WS(Arc<Mutex<Sock>>)
193}
194#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197 Connecting,
199 Connected,
201 Disconnected(String),
203 SignedIn,
205 }
218#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221 Disconnected,
223 Connecting,
225 Connected,
227 Signedin
229}
230impl Display for ClientState {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 match self {
233 ClientState::Disconnected => write!(f, "Disconnected"),
234 ClientState::Connecting => write!(f, "Connecting"),
235 ClientState::Connected => write!(f, "Connected"),
236 ClientState::Signedin => write!(f, "Signedin"),
237 }
238 }
239}
240
241#[derive(Debug, Clone, serde::Deserialize)]
243#[allow(dead_code)]
244pub struct Config {
245 #[serde(default)]
246 wshost: String,
247 #[serde(default)]
248 wsurl: String,
249 #[serde(default)]
250 domain: String,
251 #[serde(default)]
252 auto_create_users: bool,
253 #[serde(default)]
254 namespace: String,
255 #[serde(default)]
256 agent_domain_schema: String,
257 #[serde(default)]
258 version: String,
259 #[serde(default)]
260 validate_emails: bool,
261 #[serde(default)]
262 forgot_pass_emails: bool,
263 #[serde(default)]
264 supports_watch: bool,
265 #[serde(default)]
266 amqp_enabled_exchange: bool,
267 #[serde(default)]
268 multi_tenant: bool,
269 #[serde(default)]
270 enable_entity_restriction: bool,
271 #[serde(default)]
272 enable_web_tours: bool,
273 #[serde(default)]
274 collections_with_text_index: Vec<String>,
275 #[serde(default)]
276 timeseries_collections: Vec<String>,
277 #[serde(default)]
278 ping_clients_interval: i32,
279 #[serde(default)]
280 validlicense: bool,
281 #[serde(default)]
282 forceddomains: Vec<String>,
283 #[serde(default)]
284 grafana_url: String,
285 #[serde(default)]
286 otel_metric_url: String,
287 #[serde(default)]
288 otel_trace_url: String,
289 #[serde(default)]
290 otel_log_url: String,
291 #[serde(default)]
292 enable_analytics: bool,
293}
294impl std::fmt::Debug for ClientInner {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.debug_struct("ClientInner")
297 .field("queries", &self.queries)
299 .field("streams", &self.streams)
300 .finish()
301 }
302}
303impl Default for Client {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309impl Client {
310 pub fn new() -> Self {
312 let (ces, cer) = unbounded::<ClientEvent>();
313 let (out_es, out_er) = unbounded::<Envelope>();
314 Self {
315 task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
316 stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
317 user: Arc::new(std::sync::Mutex::new(None)),
318 client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
319 connect_called: Arc::new(std::sync::Mutex::new(false)),
320 runtime: Arc::new(std::sync::Mutex::new(None)),
321 msgcount: Arc::new(std::sync::Mutex::new(-1)),
322 reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
323 rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
324 rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
325 inner: Arc::new(Mutex::new(ClientInner {
326 queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
327 streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
328 watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
329 queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
330 })),
331 config: Arc::new(std::sync::Mutex::new(None)),
332 auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
333 url: Arc::new(std::sync::Mutex::new("".to_string())),
334 username: Arc::new(std::sync::Mutex::new("".to_string())),
335 password: Arc::new(std::sync::Mutex::new("".to_string())),
336 jwt: Arc::new(std::sync::Mutex::new("".to_string())),
337 service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
338 agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
339 agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
340 event_sender: ces,
341 event_receiver: cer,
342 out_envelope_sender: out_es,
343 out_envelope_receiver: out_er,
344 state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
345 default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
346 }
347 }
348 #[tracing::instrument(skip_all)]
350 pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
351 let rt = match tokio::runtime::Runtime::new() {
352 Ok(rt) => rt,
353 Err(e) => {
354 return Err(OpenIAPError::ClientError(format!(
355 "Failed to create tokio runtime: {}",
356 e
357 )));
358 }
359 };
360 self.set_runtime(Some(rt));
361 tokio::task::block_in_place(|| {
362 let handle = self.get_runtime_handle();
363 handle.block_on(self.connect_async(dst))
364 })
365 }
366
367 #[allow(unused_variables)]
369 pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
370 let config: Option<Config>;
371 let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
372 let mut port = url.port().unwrap_or(80);
373 if issecure {
374 port = 443;
375 }
376 let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
377 if host.starts_with("api-grpc") {
378 host = "api".to_string();
379 }
380 if port == 50051 {
381 port = 3000;
382 }
383 let configurl = if issecure {
384 format!(
385 "{}://{}:{}/config",
386 "https",
387 host,
388 port
389 )
390 } else {
391 format!(
392 "{}://{}:{}/config",
393 "http",
394 host,
395 port
396 )
397 };
398
399 let configurl = url::Url::parse(configurl.as_str())
400 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
401 trace!("Getting config from: {}", configurl);
402 let o = minreq::get(configurl).with_timeout(5).send();
403 match o {
404 Ok(_) => {
405 let response = match o {
406 Ok(response) => response,
407 Err(e) => {
408 error!("Failed to get config: {}", e);
409 return None;
410 }
411 };
412 if response.status_code == 200 {
413 let body = response.as_str().unwrap();
414 config = Some(match serde_json::from_str(body) {
415 Ok(config) => config,
416 Err(e) => {
417 error!("Failed to parse config: {}", e);
418 return None;
419 }
420 });
421 } else {
422 config = None;
423 }
424 }
425 Err(e) => {
426 error!("Failed to get config: {}", e);
427 return None;
428 }
429 }
430 let mut _enable_analytics = true;
431 let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
432 let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
433 let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
434 let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
435 if apihostname.starts_with("grpc.") {
436 apihostname = apihostname[5..].to_string();
437 }
438
439 if config.is_some() {
440 let config = config.as_ref().unwrap();
441 if !config.otel_metric_url.is_empty() {
442 _otel_metric_url = config.otel_metric_url.clone();
443 }
444 if !config.otel_trace_url.is_empty() {
445 _otel_trace_url = config.otel_trace_url.clone();
446 }
447 if !config.otel_log_url.is_empty() {
448 _otel_log_url = config.otel_log_url.clone();
449 }
450 if !config.domain.is_empty() {
451 apihostname = config.domain.clone();
452 }
453 _enable_analytics = config.enable_analytics;
454 }
455 #[cfg(feature = "otel")]
456 if _enable_analytics {
457 let service_name = self.get_service_name();
458 let agent_name = self.get_agent_name();
459 let agent_version = self.get_agent_version();
460 match otel::init_telemetry(&service_name, &agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(),
461 _otel_trace_url.as_str(), _otel_log_url.as_str(),
462 &self.stats) {
463 Ok(_) => (),
464 Err(e) => {
465 error!("Failed to initialize telemetry: {}", e);
466 return None;
467 }
468 }
469 }
470 config
471 }
472
473 #[tracing::instrument(skip_all)]
475 pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
476 #[cfg(test)]
477 {
478 enable_tracing("openiap=error", "");
482 }
484 if self.is_connect_called() {
485 self.set_auto_reconnect(true);
486 return self.reconnect().await;
487 }
488 let mut strurl = dst.to_string();
489 if strurl.is_empty() {
490 strurl = std::env::var("apiurl").unwrap_or("".to_string());
491 if strurl.is_empty() {
492 strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
493 }
494 if strurl.is_empty() {
495 strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
496 }
497 }
498 if strurl.is_empty() {
499 return Err(OpenIAPError::ClientError("No URL provided".to_string()));
500 }
501 let url = url::Url::parse(strurl.as_str())
502 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
503 let mut username = "".to_string();
504 let mut password = "".to_string();
505 if let Some(p) = url.password() {
506 password = p.to_string();
507 }
508 if !url.username().is_empty() {
509 username = url.username().to_string();
510 }
511 if !username.is_empty() && !password.is_empty() {
512 self.set_username(&username);
513 self.set_password(&password);
514 }
515 let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
516 if url.scheme() != "http"
517 && url.scheme() != "https"
518 && url.scheme() != "grpc"
519 && url.scheme() != "ws"
520 && url.scheme() != "wss"
521 {
522 return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
523 }
524 if url.scheme() == "grpc" {
525 if url.port() == Some(443) {
526 strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
527 } else {
528 strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
529 }
530 }
531 let url = url::Url::parse(strurl.as_str())
532 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
533 if url.port().is_none() {
534 strurl = format!(
535 "{}://{}",
536 url.scheme(),
537 url.host_str().unwrap_or("app.openiap.io")
538 );
539 } else {
540 strurl = format!(
541 "{}://{}:{}",
542 url.scheme(),
543 url.host_str().unwrap_or("localhost.openiap.io"),
544 url.port().unwrap_or(80)
545 );
546 }
547 debug!("Connecting to {}", strurl);
548 let config = self.load_config(strurl.as_str(), &url).await;
549 if !usegprc {
550 strurl = format!("{}/ws/v2", strurl);
551
552 let (_stream_tx, stream_rx) = mpsc::channel(60);
553
554 let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
555 Ok((socket, _)) => socket,
556 Err(e) => {
557 return Err(OpenIAPError::ClientError(format!(
558 "Failed to connect to WS: {}",
559 e
560 )));
561 }
562 };
563 self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
564 self.set_connect_called(true);
565 self.set_config(config);
566 self.set_url(&strurl);
567 match self.setup_ws(&strurl).await {
568 Ok(_) => (),
569 Err(e) => {
570 return Err(OpenIAPError::ClientError(format!(
571 "Failed to setup WS: {}",
572 e
573 )));
574 }
575 }
576 let client2 = self.clone();
577 tokio::task::spawn(async move {
579 tokio_stream::wrappers::ReceiverStream::new(stream_rx)
580 .for_each(|envelope: Envelope| async {
581 let command = envelope.command.clone();
582 let rid = envelope.rid.clone();
583 let id = envelope.id.clone();
584 trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
585 client2.parse_incomming_envelope(envelope).await;
586 })
587 .await;
588 }); } else {
590 if url.scheme() == "http" {
591 let response = Client::connect_grpc(strurl.clone()).await;
592 match response {
593 Ok(client) => {
594 self.set_client(ClientEnum::Grpc(client));
595 }
596 Err(e) => {
597 return Err(OpenIAPError::ClientError(format!(
598 "Failed to connect: {}",
599 e
600 )));
601 }
602 }
603 } else {
604 let uri = tonic::transport::Uri::builder()
605 .scheme(url.scheme())
606 .authority(url.host().unwrap().to_string())
607 .path_and_query("/")
608 .build();
609 let uri = match uri {
610 Ok(uri) => uri,
611 Err(e) => {
612 return Err(OpenIAPError::ClientError(format!(
613 "Failed to build URI: {}",
614 e
615 )));
616 }
617 };
618 let channel = Channel::builder(uri)
619 .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
620 let channel = match channel {
621 Ok(channel) => channel,
622 Err(e) => {
623 return Err(OpenIAPError::ClientError(format!(
624 "Failed to build channel: {}",
625 e
626 )));
627 }
628 };
629 let channel = channel.connect().await;
630 let channel = match channel {
631 Ok(channel) => channel,
632 Err(e) => {
633 return Err(OpenIAPError::ClientError(format!(
634 "Failed to connect: {}",
635 e
636 )));
637 }
638 };
639 self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
640 }
641 self.set_connect_called(true);
642 self.set_config(config);
643 self.set_url(&strurl);
644 self.setup_grpc_stream().await?;
645 };
646 self.post_connected().await
647 }
648
649 #[tracing::instrument(skip_all)]
678 pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
679 #[cfg(test)]
680 {
681 enable_tracing("openiap=error", "");
682 }
683 let client = Client::new();
684 client.connect_async(dst).await?;
685 Ok(client)
686 }
687 pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
689 if self.get_username().is_empty() && self.get_password().is_empty() {
690 self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
691 self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
692 }
693 if !self.get_username().is_empty() && !self.get_password().is_empty() {
694 debug!("Signing in with username: {}", self.get_username());
695 let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
696 let loginresponse = self.signin(signin).await;
697 match loginresponse {
698 Ok(response) => {
699 self.reset_reconnect_ms();
700 self.set_connected(ClientState::Connected, None);
701 info!("Signed in as {}", response.user.as_ref().unwrap().username);
702 Ok(())
703 }
704 Err(_e) => {
705 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
706 Err(_e)
707 }
708 }
709 } else {
710 self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
711 if self.get_jwt().is_empty() {
712 self.set_jwt(&std::env::var("jwt").unwrap_or_default());
713 }
714 if !self.get_jwt().is_empty() {
715 debug!("Signing in with JWT");
716 let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
717 let loginresponse = self.signin(signin).await;
718 match loginresponse {
719 Ok(response) => match response.user {
720 Some(user) => {
721 self.reset_reconnect_ms();
722 info!("Signed in as {}", user.username);
723 self.set_connected(ClientState::Connected, None);
724 Ok(())
725 }
726 None => {
727 self.reset_reconnect_ms();
728 info!("Signed in as guest");
729 self.set_connected(ClientState::Connected, None);
730 Ok(())
731 }
733 },
734 Err(_e) => {
735 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
736 Err(_e)
737 }
738 }
739 } else {
740 self.reset_reconnect_ms();
741 match self.get_element().await {
742 Ok(_) => {
743 debug!("Connected, No credentials provided so is running as guest");
744 self.set_connected(ClientState::Connected, None);
745 Ok(())
746 },
747 Err(e) => {
748 self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
749 Err(e)
750 }
751 }
752 }
753 }
754 }
755 #[tracing::instrument(skip_all)]
757 pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
758 let state = self.get_state();
759 if state == ClientState::Connected || state == ClientState::Signedin {
760 return Ok(());
761 }
762 if !self.is_auto_reconnect() {
763 return Ok(());
764 }
765 let client = self.get_client();
766
767 match client {
768 ClientEnum::WS(ref _client) => {
769 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
770 self.setup_ws(&self.get_url()).await?;
771 debug!("Completed reconnecting to websocket");
772 self.post_connected().await
773 }
774 ClientEnum::Grpc(ref _client) => {
775 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
776 match self.setup_grpc_stream().await {
777 Ok(_) => {
778 debug!("Completed reconnecting to gRPC");
779 self.post_connected().await
780 },
781 Err(e) => {
782 return Err(OpenIAPError::ClientError(format!(
783 "Failed to setup gRPC stream: {}",
784 e
785 )));
786 }
787 }
788 }
789 ClientEnum::None => {
790 return Err(OpenIAPError::ClientError("Invalid client".to_string()));
791 }
792 }
793 }
794 pub fn disconnect(&self) {
796 self.set_auto_reconnect(false);
797 self.set_connected(ClientState::Disconnected, Some("Disconnected"));
798 }
799 pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
801 {
802 let current = self.get_state();
803 trace!("Set connected: {:?} from {:?}", state, current);
804 if state == ClientState::Connected && current == ClientState::Signedin {
805 self.set_state(ClientState::Signedin);
806 } else {
807 self.set_state(state.clone());
808 }
809 if state == ClientState::Disconnected && !current.eq(&state) {
810 let me = self.clone();
811 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
812 tokio::task::spawn(async move {
813 let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
814 let mut callback_guard = me.rpc_callback.lock().await;
815 *reply_queue_guard = None;
816 *callback_guard = None;
817 });
818 }
819 }
820 if state == ClientState::Connecting && !current.eq(&state) {
821 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
822 self.stats.lock().unwrap().connection_attempts += 1;
823 let me = self.clone();
824 tokio::task::spawn(async move {
825 me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
826 });
827 }
828
829 }
830 if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) {
831 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
832 self.stats.lock().unwrap().connections += 1;
833 let me = self.clone();
834 tokio::task::spawn(async move {
835 me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
836 });
837 }
838 }
839 if state == ClientState::Signedin && current != ClientState::Signedin {
840 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
841 let me = self.clone();
842 tokio::task::spawn(async move {
843 me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
844 });
845 }
846 }
847 if state == ClientState::Disconnected && !current.eq(&state) {
848 if message.is_some() {
849 debug!("Disconnected: {}", message.unwrap());
850 } else {
851 debug!("Disconnected");
852 }
853 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
854 let me = self.clone();
855 let message = match message {
856 Some(message) => message.to_string(),
857 None => "".to_string(),
858 };
859 tokio::task::spawn(async move {
860 me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
861 });
862 }
863
864 self.kill_handles();
865 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
866 let client = self.clone();
867 tokio::task::spawn(async move {
868 {
869 let inner = client.inner.lock().await;
870 let mut queries = inner.queries.lock().await;
871 let ids = queries.keys().cloned().collect::<Vec<String>>();
872 debug!("********************************************** Cleaning up");
873 for id in ids {
874 let err = ErrorResponse {
875 code: 500,
876 message: "Disconnected".to_string(),
877 stack: "".to_string(),
878 };
879 let envelope = err.to_envelope();
880 let tx = queries.remove(&id).unwrap();
881 debug!("kill query: {}", id);
882 let _ = tx.send(envelope);
883 }
884 let mut streams = inner.streams.lock().await;
885 let ids = streams.keys().cloned().collect::<Vec<String>>();
886 for id in ids {
887 let tx = streams.remove(&id).unwrap();
888 debug!("kill stream: {}", id);
889 let _ = tx.send(Vec::new()).await;
890 }
891 let mut queues = inner.queues.lock().await;
892 let ids = queues.keys().cloned().collect::<Vec<String>>();
893 for id in ids {
894 let _ = queues.remove(&id).unwrap();
895 }
896 let mut watches = inner.watches.lock().await;
897 let ids = watches.keys().cloned().collect::<Vec<String>>();
898 for id in ids {
899 let _ = watches.remove(&id).unwrap();
900 }
901 debug!("**********************************************************");
902 }
903 if client.is_auto_reconnect() {
904 trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
905 tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
906 if client.is_auto_reconnect() {
907 client.inc_reconnect_ms();
908 trace!("Reconnecting . . .");
910 client.reconnect().await.unwrap_or_else(|e| {
911 error!("Failed to reconnect: {}", e);
912 client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
913 });
914 } else {
915 debug!("Not reconnecting");
916 }
917 } else {
918 debug!("Reconnecting disabled, stop now");
919 }
920 });
921 }
922
923 }
924 }
925 }
926 pub fn get_state(&self) -> ClientState {
928 let conn = self.state.lock().unwrap();
929 conn.clone()
930 }
931 pub fn set_state(&self, state: ClientState) {
933 let mut conn = self.state.lock().unwrap();
934 *conn = state;
935 }
936 pub fn set_msgcount(&self, msgcount: i32) {
938 let mut current = self.msgcount.lock().unwrap();
939 trace!("Set msgcount: {} from {}", msgcount, *current);
940 *current = msgcount;
941 }
942 pub fn inc_msgcount(&self) -> i32 {
944 let mut current = self.msgcount.lock().unwrap();
945 *current += 1;
946 *current
947 }
948 pub fn get_reconnect_ms(&self) -> i32 {
950 let reconnect_ms = self.reconnect_ms.lock().unwrap();
951 *reconnect_ms
952 }
953 pub fn reset_reconnect_ms(&self) {
955 let mut current = self.reconnect_ms.lock().unwrap();
956 *current = 500;
957 }
958 pub fn inc_reconnect_ms(&self) -> i32 {
960 let mut current = self.reconnect_ms.lock().unwrap();
961 if *current < 30000 {
962 *current += 500;
963 }
964 *current
965 }
966
967 pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
969 let mut handles = self.task_handles.lock().unwrap();
970 handles.push(handle);
971 }
972 pub fn kill_handles(&self) {
974 let mut handles = self.task_handles.lock().unwrap();
975 for handle in handles.iter() {
976 debug!("Killing handle");
979 if !handle.is_finished() {
980 handle.abort();
981 }
982 }
983 handles.clear();
984 }
991
992
993 #[tracing::instrument(skip_all)]
995 fn get_msgcount(&self) -> i32 {
996 let msgcount = self.msgcount.lock().unwrap();
997 *msgcount
998 }
999 pub fn set_default_timeout(&self, timeout: Duration) {
1001 let mut current = self.default_timeout.lock().unwrap();
1002 trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1003 *current = timeout;
1004 }
1005 pub fn get_default_timeout(&self) -> Duration {
1007 let current = self.default_timeout.lock().unwrap();
1008 current.clone()
1009 }
1010 #[tracing::instrument(skip_all)]
1012 pub fn set_connect_called(&self, connect_called: bool) {
1013 let mut current = self.connect_called.lock().unwrap();
1014 trace!("Set connect_called: {} from {}", connect_called, *current);
1015 *current = connect_called;
1016 }
1017 #[tracing::instrument(skip_all)]
1019 fn is_connect_called(&self) -> bool {
1020 let connect_called = self.connect_called.lock().unwrap();
1021 *connect_called
1022 }
1023 #[tracing::instrument(skip_all)]
1025 pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1026 let mut current = self.auto_reconnect.lock().unwrap();
1027 trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1028 *current = auto_reconnect;
1029 }
1030 #[tracing::instrument(skip_all)]
1032 fn is_auto_reconnect(&self) -> bool {
1033 let auto_reconnect = self.auto_reconnect.lock().unwrap();
1034 *auto_reconnect
1035 }
1036 #[tracing::instrument(skip_all)]
1038 pub fn set_url(&self, url: &str) {
1039 let mut current = self.url.lock().unwrap();
1040 trace!("Set url: {} from {}", url, *current);
1041 *current = url.to_string();
1042 }
1043 #[tracing::instrument(skip_all)]
1045 fn get_url(&self) -> String {
1046 let url = self.url.lock().unwrap();
1047 url.to_string()
1048 }
1049 #[tracing::instrument(skip_all)]
1051 pub fn set_username(&self, username: &str) {
1052 let mut current = self.username.lock().unwrap();
1053 trace!("Set username: {} from {}", username, *current);
1054 *current = username.to_string();
1055 }
1056 #[tracing::instrument(skip_all)]
1058 fn get_username(&self) -> String {
1059 let username = self.username.lock().unwrap();
1060 username.to_string()
1061 }
1062 #[tracing::instrument(skip_all)]
1064 pub fn set_password(&self, password: &str) {
1065 let mut current = self.password.lock().unwrap();
1066 trace!("Set password: {} from {}", password, *current);
1067 *current = password.to_string();
1068 }
1069 #[tracing::instrument(skip_all)]
1071 fn get_password(&self) -> String {
1072 let password = self.password.lock().unwrap();
1073 password.to_string()
1074 }
1075 #[tracing::instrument(skip_all)]
1077 pub fn set_jwt(&self, jwt: &str) {
1078 let mut current = self.jwt.lock().unwrap();
1079 trace!("Set jwt: {} from {}", jwt, *current);
1080 *current = jwt.to_string();
1081 }
1082 #[tracing::instrument(skip_all)]
1084 fn get_jwt(&self) -> String {
1085 let jwt = self.jwt.lock().unwrap();
1086 jwt.to_string()
1087 }
1088
1089 #[tracing::instrument(skip_all)]
1091 pub fn set_service_name(&self, service_name: &str) {
1092 let mut current = self.service_name.lock().unwrap();
1093 trace!("Set servicename: {} from {}", service_name, *current);
1094 *current = service_name.to_string();
1095 }
1096 #[tracing::instrument(skip_all)]
1098 pub fn get_service_name(&self) -> String {
1099 let servicename = self.service_name.lock().unwrap();
1100 servicename.to_string()
1101 }
1102 #[tracing::instrument(skip_all)]
1104 pub fn set_agent_name(&self, agent: &str) {
1105 let mut current = self.agent_name.lock().unwrap();
1106 trace!("Set agent: {} from {}", agent, *current);
1107 *current = agent.to_string();
1108 }
1109 #[tracing::instrument(skip_all)]
1111 pub fn get_agent_name(&self) -> String {
1112 let agent = self.agent_name.lock().unwrap();
1113 agent.to_string()
1114 }
1115 #[tracing::instrument(skip_all)]
1117 pub fn set_agent_version(&self, version: &str) {
1118 let mut current = self.agent_version.lock().unwrap();
1119 trace!("Set agent version: {} from {}", version, *current);
1120 *current = version.to_string();
1121 }
1122 #[tracing::instrument(skip_all)]
1124 pub fn get_agent_version(&self) -> String {
1125 let agent_version = self.agent_version.lock().unwrap();
1126 agent_version.to_string()
1127 }
1128
1129 #[tracing::instrument(skip_all)]
1131 pub fn set_config(&self, config: Option<Config>) {
1132 let mut current = self.config.lock().unwrap();
1133 *current = config;
1134 }
1135 #[tracing::instrument(skip_all)]
1137 pub fn get_config(&self) -> Option<Config> {
1138 let config = self.config.lock().unwrap();
1139 config.clone()
1140 }
1141 #[tracing::instrument(skip_all)]
1143 pub fn set_client(&self, client: ClientEnum) {
1144 let mut current = self.client.lock().unwrap();
1145 *current = client;
1146 }
1147 #[tracing::instrument(skip_all)]
1149 fn get_client(&self) -> ClientEnum {
1150 let client = self.client.lock().unwrap();
1151 client.clone()
1152 }
1153 #[tracing::instrument(skip_all)]
1155 pub fn set_user(&self, user: Option<User>) {
1156 let mut current = self.user.lock().unwrap();
1157 *current = user;
1158 }
1159 #[tracing::instrument(skip_all)]
1161 pub fn get_user(&self) -> Option<User> {
1162 let user = self.user.lock().unwrap();
1163 user.clone()
1164 }
1165 #[tracing::instrument(skip_all)]
1175 pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1176 let mut current = self.runtime.lock().unwrap();
1177 *current = runtime;
1178 }
1179 #[tracing::instrument(skip_all)]
1181 pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1183 self.runtime.as_ref()
1184 }
1185 #[tracing::instrument(skip_all)]
1187 pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1188 let mut rt = self.runtime.lock().unwrap();
1189 if rt.is_none() {
1190 let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1192 *rt = Some(runtime);
1193 } else {
1194 }
1196 rt.as_ref().unwrap().handle().clone()
1197 }
1198 #[tracing::instrument(skip_all)]
1200 pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1201 {
1202 let event_receiver = self.event_receiver.clone();
1204 let callback = callback;
1205 let _handle = tokio::task::spawn(async move {
1206 while let Ok(event) = event_receiver.recv().await {
1207 callback(event);
1208 }
1209 }); }
1211 #[tracing::instrument(skip_all)]
1213 pub fn get_uniqueid() -> String {
1214 static COUNTER: AtomicUsize = AtomicUsize::new(1);
1215 let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1216 let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1217 let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1218 let sqids = Sqids::default();
1219 sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1220 }
1221 #[tracing::instrument(skip_all)]
1223 async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1224 let response = self.send_noawait(msg).await;
1225 match response {
1226 Ok((response_rx, id)) => {
1227 let timeout = match timeout {
1228 Some(t) => t,
1229 None => self.get_default_timeout()
1230 };
1231 let result = tokio::time::timeout(timeout, response_rx).await;
1232 let inner = self.inner.lock().await;
1234 inner.queries.lock().await.remove(&id);
1235
1236 match result {
1237 Ok(Ok(response)) => Ok(response),
1238 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1239 Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1240 }
1241 }
1254 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1255 }
1256 }
1257 #[tracing::instrument(skip_all)]
1260 async fn send_noawait(
1261 &self,
1262 mut msg: Envelope,
1263 ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1264 let (response_tx, response_rx) = oneshot::channel();
1265 let id = Client::get_uniqueid();
1266 msg.id = id.clone();
1267
1268 {
1270 let inner = self.inner.lock().await;
1271 inner.queries.lock().await.insert(id.clone(), response_tx);
1272 }
1273
1274 let res = self.send_envelope(msg).await;
1276 if let Err(e) = res {
1277 let inner = self.inner.lock().await;
1279 inner.queries.lock().await.remove(&id);
1280 return Err(OpenIAPError::ClientError(e.to_string()));
1281 }
1282
1283 Ok((response_rx, id))
1284 }
1285 #[tracing::instrument(skip_all)]
1287 async fn sendwithstream(
1288 &self,
1289 mut msg: Envelope,
1290 ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1291 let (response_tx, response_rx) = oneshot::channel();
1292 let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1293 let id = Client::get_uniqueid();
1294 msg.id = id.clone();
1295 {
1296 let inner = self.inner.lock().await;
1297 inner.queries.lock().await.insert(id.clone(), response_tx);
1298 inner.streams.lock().await.insert(id.clone(), stream_tx);
1299 let res = self.send_envelope(msg).await;
1300 match res {
1301 Ok(_) => (),
1302 Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1303 }
1304 }
1305 Ok((response_rx, stream_rx))
1306 }
1307 #[tracing::instrument(skip_all, target = "openiap::client")]
1308 async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1309 if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin )
1310 && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1311 return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1312 }
1313 let env = envelope.clone();
1314 let command = envelope.command.clone();
1315 self.stats.lock().unwrap().package_tx += 1;
1316 match command.as_str() {
1317 "signin" => { self.stats.lock().unwrap().signin += 1;},
1318 "upload" => { self.stats.lock().unwrap().upload += 1;},
1319 "download" => { self.stats.lock().unwrap().download += 1;},
1320 "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1321 "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1322 "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1323 "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1324 "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1325 "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1326 "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1327
1328 "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1329 "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1330 "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1331 "watch" => { self.stats.lock().unwrap().watch += 1;},
1332 "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1333 "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1334
1335 "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1336 "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1337 "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1338 "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1339 "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1340 "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1341 "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1342 "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1343
1344 "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1345 "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1346 "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1347 "query" => { self.stats.lock().unwrap().query += 1;},
1348 "count" => { self.stats.lock().unwrap().count += 1;},
1349 "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1350 "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1351 "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1352 "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1353 "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1354 "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1355 "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1356 "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1357 "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1358 "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1359 _ => {}
1360 };
1361 if envelope.id.is_empty() {
1362 let id = Client::get_uniqueid();
1363 envelope.id = id.clone();
1364 }
1365 trace!("Sending {} message, in the thread", command);
1366 let res = self.out_envelope_sender.send(env).await;
1367 if res.is_err() {
1368 error!("{:?}", res);
1369 let errmsg = res.unwrap_err().to_string();
1370 self.set_connected(ClientState::Disconnected, Some(&errmsg));
1371 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1372 } else {
1373 return Ok(())
1374 }
1375 }
1376 #[tracing::instrument(skip_all, target = "openiap::client")]
1377 async fn parse_incomming_envelope(&self, received: Envelope) {
1378 self.stats.lock().unwrap().package_rx += 1;
1379 let command = received.command.clone();
1380 trace!("parse_incomming_envelope, command: {}", command);
1381 let inner = self.inner.lock().await;
1382 let rid = received.rid.clone();
1383 let mut queries = inner.queries.lock().await;
1384 let mut streams = inner.streams.lock().await;
1385 let watches = inner.watches.lock().await;
1386 let queues = inner.queues.lock().await;
1387
1388 if command != "ping" && command != "pong" && command != "refreshtoken" {
1389 if rid.is_empty() {
1390 debug!("Received #{} #{} {} message", received.seq, received.id, command);
1391 } else {
1392 debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1393 }
1394 } else if rid.is_empty() {
1395 trace!("Received #{} #{} {} message", received.seq, received.id, command);
1396 } else {
1397 trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1398 }
1399
1400 if command == "ping" {
1401 self.pong(&received.id).await;
1402 } else if command == "refreshtoken" {
1404 } else if command == "beginstream"
1406 || command == "stream"
1407 || command == "endstream"
1408 {
1409 let streamresponse: Stream =
1410 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1411 let streamdata = streamresponse.data;
1412 if !streamdata.is_empty() {
1413 let stream = streams.get(rid.as_str()).unwrap();
1414
1415 match stream.send(streamdata).await {
1416 Ok(_) => _ = (),
1417 Err(e) => error!("Failed to send data: {}", e),
1418 }
1419 }
1420 if command == "endstream" {
1421 let _ = streams.remove(rid.as_str());
1422 }
1423 } else if command == "watchevent" {
1424 let watchevent: WatchEvent =
1425 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1426 if let Some(callback) = watches.get(watchevent.id.as_str()) {
1427 callback(watchevent);
1428 }
1429 } else if command == "queueevent" {
1430 let queueevent: QueueEvent =
1431 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1432 if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1433 let queuename = queueevent.replyto.clone();
1434 let correlation_id = queueevent.correlation_id.clone();
1435 let me = self.clone();
1436 tokio::spawn(async move {
1437 let result_fut = callback(Arc::new(me.clone()), queueevent);
1438 let result = result_fut.await;
1439 if result.is_some() && !queuename.is_empty() {
1440 debug!("Sending return value from queue event callback to {}", queuename);
1441 let result = result.unwrap();
1442 let q = QueueMessageRequest {
1443 queuename,
1444 correlation_id,
1445 data: result,
1446 striptoken: true,
1447 ..Default::default()
1448 };
1449 let e = q.to_envelope();
1450 let send_result = me.send(e, None).await;
1451 if let Err(e) = send_result {
1452 error!("Failed to send queue event response: {}", e);
1453 }
1454 }
1455 });
1456 }
1457 } else if let Some(response_tx) = queries.remove(&rid) {
1458 let stream = streams.get(rid.as_str());
1459 if let Some(stream) = stream {
1460 let streamdata = vec![];
1461 match stream.send(streamdata).await {
1462 Ok(_) => _ = (),
1463 Err(e) => error!("Failed to send data: {}", e),
1464 }
1465 }
1466 let _ = response_tx.send(received);
1467 } else {
1468 error!("Received unhandled {} message: {:?}", command, received);
1469 }
1470 }
1471 #[tracing::instrument(skip_all)]
1473 async fn get_element(&self) -> Result<(), OpenIAPError> {
1474 let id = Client::get_uniqueid();
1475 let envelope = Envelope {
1476 id: id.clone(),
1477 command: "getelement".into(),
1478 ..Default::default()
1479 };
1480 let result = match self.send(envelope, None).await {
1481 Ok(res) => res,
1482 Err(e) => {
1483 return Err(e);
1484 },
1485 };
1486 if result.command == "pong" || result.command == "getelement" {
1487 Ok(())
1488 } else if result.command == "error" {
1489 let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1490 Err(OpenIAPError::ServerError(e.message))
1491 } else {
1492 Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1493 }
1494 }
1495 #[tracing::instrument(skip_all)]
1497 async fn ping(&self) -> Result<(), OpenIAPError> {
1498 let id = Client::get_uniqueid();
1499 let envelope = Envelope {
1500 id: id.clone(),
1501 command: "getelement".into(),
1502 ..Default::default()
1503 };
1504 match self.send_envelope(envelope).await {
1505 Ok(_res) => Ok(()),
1506 Err(e) => {
1507 return Err(e);
1508 },
1509 }
1510 }
1511 #[tracing::instrument(skip_all)]
1513 async fn pong(&self, rid: &str) {
1514 let id = Client::get_uniqueid();
1515 let envelope = Envelope {
1516 id: id.clone(),
1517 command: "pong".into(),
1518 rid: rid.to_string(),
1519 ..Default::default()
1520 };
1521 match self.send_envelope(envelope).await {
1522 Ok(_) => (),
1523 Err(e) => error!("Failed to send pong: {}", e),
1524 }
1525 }
1526 #[tracing::instrument(skip_all)]
1532 pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1533 if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1535 if config.jwt.is_empty() {
1536 config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1537 }
1538 if config.jwt.is_empty() {
1539 config.jwt = std::env::var("jwt").unwrap_or_default();
1540 }
1541 if config.jwt.is_empty() {
1543 if config.username.is_empty() {
1544 config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1545 }
1546 if config.password.is_empty() {
1547 config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1548 }
1549 }
1550 }
1551 let version = env!("CARGO_PKG_VERSION");
1552 if !version.is_empty() && config.version.is_empty() {
1553 config.version = version.to_string();
1554 }
1555 if config.agent.is_empty() {
1556 config.agent = self.get_agent_name();
1557 }
1558
1559 let envelope = config.to_envelope();
1561 let result = self.send(envelope, None).await;
1562
1563 match &result {
1564 Ok(m) => {
1565 debug!("Sign-in reply received");
1566 if m.command == "error" {
1567 let e: ErrorResponse =
1568 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1569 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1570 return Err(OpenIAPError::ServerError(e.message));
1571 }
1572 debug!("Sign-in successful");
1573 let response: SigninResponse =
1574 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1575 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1576 if !config.validateonly {
1577 self.set_connected(ClientState::Signedin, None);
1578 self.set_user(Some(response.user.as_ref().unwrap().clone()));
1579 }
1580 Ok(response)
1581 }
1582 Err(e) => {
1583 debug!("Sending Sign-in request failed {:?}", result);
1584 debug!("Sign-in failed: {}", e.to_string());
1585 if !config.validateonly {
1586 self.set_user(None);
1587 }
1588 Err(OpenIAPError::ClientError(e.to_string()))
1589 }
1590 }
1591 }
1592 #[tracing::instrument(skip_all)]
1596 pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1597 let config = ListCollectionsRequest::new(includehist);
1598 let envelope = config.to_envelope();
1599 let result = self.send(envelope, None).await;
1600 match result {
1601 Ok(m) => {
1602 let data = match m.data {
1603 Some(data) => data,
1604 None => {
1605 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1606 }
1607 };
1608 if m.command == "error" {
1609 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1610 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1611 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1612 }
1613 let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1614 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1615 Ok(response.results)
1616 }
1617 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1618 }
1619 }
1620 #[tracing::instrument(skip_all)]
1672 pub async fn create_collection(
1673 &self,
1674 config: CreateCollectionRequest,
1675 ) -> Result<(), OpenIAPError> {
1676 if config.collectionname.is_empty() {
1677 return Err(OpenIAPError::ClientError(
1678 "No collection name provided".to_string(),
1679 ));
1680 }
1681 let envelope = config.to_envelope();
1682 let result = self.send(envelope, None).await;
1683 match result {
1684 Ok(m) => {
1685 let data = match m.data {
1686 Some(data) => data,
1687 None => {
1688 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1689 }
1690 };
1691 if m.command == "error" {
1692 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1693 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1694 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1695 }
1696 Ok(())
1697 }
1698 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1699 }
1700 }
1701 #[tracing::instrument(skip_all)]
1704 pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1705 if config.collectionname.is_empty() {
1706 return Err(OpenIAPError::ClientError(
1707 "No collection name provided".to_string(),
1708 ));
1709 }
1710 let envelope = config.to_envelope();
1711 let result = self.send(envelope, None).await;
1712 match result {
1713 Ok(m) => {
1714 let data = match m.data {
1715 Some(data) => data,
1716 None => {
1717 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1718 }
1719 };
1720 if m.command == "error" {
1721 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1722 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1723 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1724 }
1725 Ok(())
1726 }
1727 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1728 }
1729 }
1730 pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1744 if config.collectionname.is_empty() {
1745 return Err(OpenIAPError::ClientError(
1746 "No collection name provided".to_string(),
1747 ));
1748 }
1749 let envelope = config.to_envelope();
1750 let result = self.send(envelope, None).await;
1751 match result {
1752 Ok(m) => {
1753 let data = match m.data {
1754 Some(data) => data,
1755 None => {
1756 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1757 }
1758 };
1759 if m.command == "error" {
1760 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1761 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1762 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1763 }
1764 let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1765 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1766 Ok(response.results)
1767 }
1768 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1769 }
1770 }
1771 pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1812 if config.collectionname.is_empty() {
1813 return Err(OpenIAPError::ClientError(
1814 "No collection name provided".to_string(),
1815 ));
1816 }
1817 if config.index.is_empty() {
1818 return Err(OpenIAPError::ClientError(
1819 "No index was provided".to_string(),
1820 ));
1821 }
1822 let envelope = config.to_envelope();
1823 let result = self.send(envelope, None).await;
1824 match result {
1825 Ok(m) => {
1826 let data = match m.data {
1827 Some(data) => data,
1828 None => {
1829 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1830 }
1831 };
1832 if m.command == "error" {
1833 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1834 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1835 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1836 }
1837 Ok(())
1838 }
1839 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1840 }
1841 }
1842 pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1845 if config.collectionname.is_empty() {
1846 return Err(OpenIAPError::ClientError(
1847 "No collection name provided".to_string(),
1848 ));
1849 }
1850 if config.name.is_empty() {
1851 return Err(OpenIAPError::ClientError(
1852 "No index name provided".to_string(),
1853 ));
1854 }
1855 let envelope = config.to_envelope();
1856 let result = self.send(envelope, None).await;
1857 match result {
1858 Ok(m) => {
1859 let data = match m.data {
1860 Some(data) => data,
1861 None => {
1862 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1863 }
1864 };
1865 if m.command == "error" {
1866 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1867 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1868 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1869 }
1870 Ok(())
1871 }
1872 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1873 }
1874 }
1875 #[tracing::instrument(skip_all)]
1913 pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1914 if config.collectionname.is_empty() {
1915 config.collectionname = "entities".to_string();
1916 }
1917
1918 let envelope = config.to_envelope();
1919 let result = self.send(envelope, None).await;
1920 match result {
1921 Ok(m) => {
1922 let data = match m.data {
1923 Some(data) => data,
1924 None => {
1925 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1926 }
1927 };
1928 if m.command == "error" {
1929 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1930 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1931 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1932 }
1933 let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1934 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1935 debug!("Return Ok(response)");
1936 Ok(response)
1937 }
1938 Err(e) => {
1939 debug!("Error !!");
1940 Err(OpenIAPError::ClientError(e.to_string()))
1941 }
1942 }
1943 }
1944 #[tracing::instrument(skip_all)]
1970 pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
1971 if config.collectionname.is_empty() {
1972 config.collectionname = "entities".to_string();
1973 }
1974 config.top = 1;
1975 let envelope = config.to_envelope();
1976 let result = self.send(envelope, None).await;
1977 match result {
1978 Ok(m) => {
1979 let data = match m.data {
1980 Some(data) => data,
1981 None => return None,
1982 };
1983 if m.command == "error" {
1984 return None;
1985 }
1986 let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
1987
1988 let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
1989 let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1990 if items.is_empty() {
1991 return None;
1992 }
1993 let item = items[0].clone();
1994 Some(item)
1995 }
1996 Err(_) => None,
1997 }
1998 }
1999
2000 #[tracing::instrument(skip_all)]
2110 pub async fn get_document_version(
2111 &self,
2112 mut config: GetDocumentVersionRequest,
2113 ) -> Result<String, OpenIAPError> {
2114 if config.collectionname.is_empty() {
2115 config.collectionname = "entities".to_string();
2116 }
2117 if config.id.is_empty() {
2118 return Err(OpenIAPError::ClientError("No id provided".to_string()));
2119 }
2120 let envelope = config.to_envelope();
2121 let result = self.send(envelope, None).await;
2122 match result {
2123 Ok(m) => {
2124 let data = match m.data {
2125 Some(data) => data,
2126 None => {
2127 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2128 }
2129 };
2130 if m.command == "error" {
2131 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2132 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2133 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2134 }
2135 let response: GetDocumentVersionResponse =
2136 prost::Message::decode(data.value.as_ref())
2137 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2138 Ok(response.result)
2139 }
2140 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2141 }
2142 }
2143 #[tracing::instrument(skip_all)]
2162 pub async fn aggregate(
2163 &self,
2164 mut config: AggregateRequest,
2165 ) -> Result<AggregateResponse, OpenIAPError> {
2166 if config.collectionname.is_empty() {
2167 config.collectionname = "entities".to_string();
2168 }
2169 if config.hint.is_empty() {
2170 config.hint = "".to_string();
2171 }
2172 if config.queryas.is_empty() {
2173 config.queryas = "".to_string();
2174 }
2175 if config.aggregates.is_empty() {
2176 return Err(OpenIAPError::ClientError(
2177 "No aggregates provided".to_string(),
2178 ));
2179 }
2180 let envelope = config.to_envelope();
2181 let result = self.send(envelope, None).await;
2182 match result {
2183 Ok(m) => {
2184 let data = match m.data {
2185 Some(data) => data,
2186 None => {
2187 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2188 }
2189 };
2190 if m.command == "error" {
2191 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2192 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2193 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2194 }
2195 let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2196 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2197 Ok(response)
2198 }
2199 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2200 }
2201 }
2202 #[tracing::instrument(skip_all)]
2204 pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2205 if config.collectionname.is_empty() {
2206 config.collectionname = "entities".to_string();
2207 }
2208 if config.query.is_empty() {
2209 config.query = "{}".to_string();
2210 }
2211 let envelope = config.to_envelope();
2212 let result = self.send(envelope, None).await;
2213 match result {
2214 Ok(m) => {
2215 let data = match m.data {
2216 Some(data) => data,
2217 None => {
2218 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2219 }
2220 };
2221 if m.command == "error" {
2222 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2223 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2224 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2225 }
2226 let response: CountResponse = prost::Message::decode(data.value.as_ref())
2227 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2228 Ok(response)
2229 }
2230 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2231 }
2232 }
2233 #[tracing::instrument(skip_all)]
2235 pub async fn distinct(
2236 &self,
2237 mut config: DistinctRequest,
2238 ) -> Result<DistinctResponse, OpenIAPError> {
2239 if config.collectionname.is_empty() {
2240 config.collectionname = "entities".to_string();
2241 }
2242 if config.query.is_empty() {
2243 config.query = "{}".to_string();
2244 }
2245 if config.field.is_empty() {
2246 return Err(OpenIAPError::ClientError("No field provided".to_string()));
2247 }
2248 let envelope = config.to_envelope();
2249 let result = self.send(envelope, None).await;
2250 match result {
2251 Ok(m) => {
2252 let data = match m.data {
2253 Some(data) => data,
2254 None => {
2255 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2256 }
2257 };
2258 if m.command == "error" {
2259 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2260 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2261 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2262 }
2263 let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2264 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2265 Ok(response)
2266 }
2267 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2268 }
2269 }
2270 #[tracing::instrument(skip_all)]
2272 pub async fn insert_one(
2273 &self,
2274 config: InsertOneRequest,
2275 ) -> Result<InsertOneResponse, OpenIAPError> {
2276 let envelope = config.to_envelope();
2277 let result = self.send(envelope, None).await;
2278 match result {
2279 Ok(m) => {
2280 let data = match m.data {
2281 Some(data) => data,
2282 None => {
2283 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2284 }
2285 };
2286 if m.command == "error" {
2287 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2288 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2289 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2290 }
2291 let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2292 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2293 Ok(response)
2294 }
2295 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2296 }
2297 }
2298 #[tracing::instrument(skip_all)]
2300 pub async fn insert_many(
2301 &self,
2302 config: InsertManyRequest,
2303 ) -> Result<InsertManyResponse, OpenIAPError> {
2304 let envelope = config.to_envelope();
2305 let result = self.send(envelope, None).await;
2306 match result {
2307 Ok(m) => {
2308 let data = match m.data {
2309 Some(data) => data,
2310 None => {
2311 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2312 }
2313 };
2314 if m.command == "error" {
2315 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2316 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2317 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2318 }
2319 let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2320 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2321 Ok(response)
2322 }
2323 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2324 }
2325 }
2326 #[tracing::instrument(skip_all)]
2328 pub async fn update_one(
2329 &self,
2330 config: UpdateOneRequest,
2331 ) -> Result<UpdateOneResponse, OpenIAPError> {
2332 let envelope = config.to_envelope();
2333 let result = self.send(envelope, None).await;
2334 match result {
2335 Ok(m) => {
2336 let data = match m.data {
2337 Some(data) => data,
2338 None => {
2339 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2340 }
2341 };
2342 if m.command == "error" {
2343 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2344 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2345 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2346 }
2347 let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2348 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2349 Ok(response)
2350 }
2351 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2352 }
2353 }
2354 #[tracing::instrument(skip_all)]
2356 pub async fn insert_or_update_one(
2357 &self,
2358 config: InsertOrUpdateOneRequest,
2359 ) -> Result<String, OpenIAPError> {
2360 let envelope = config.to_envelope();
2361 let result = self.send(envelope, None).await;
2362 match result {
2363 Ok(m) => {
2364 let data = match m.data {
2365 Some(data) => data,
2366 None => {
2367 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2368 }
2369 };
2370 if m.command == "error" {
2371 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2372 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2373 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2374 }
2375 let response: InsertOrUpdateOneResponse =
2376 prost::Message::decode(data.value.as_ref())
2377 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2378 Ok(response.result)
2379 }
2380 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2381 }
2382 }
2383 #[tracing::instrument(skip_all)]
2385 pub async fn insert_or_update_many(
2386 &self,
2387 config: InsertOrUpdateManyRequest,
2388 ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2389 let envelope = config.to_envelope();
2390 let result = self.send(envelope, None).await;
2391 match result {
2392 Ok(m) => {
2393 let data = match m.data {
2394 Some(data) => data,
2395 None => {
2396 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2397 }
2398 };
2399 if m.command == "error" {
2400 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2401 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2402 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2403 }
2404 let response: InsertOrUpdateManyResponse =
2405 prost::Message::decode(data.value.as_ref())
2406 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2407 Ok(response)
2408 }
2409 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2410 }
2411 }
2412 #[tracing::instrument(skip_all)]
2414 pub async fn update_document(
2415 &self,
2416 config: UpdateDocumentRequest,
2417 ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2418 let envelope = config.to_envelope();
2419 let result = self.send(envelope, None).await;
2420 match result {
2421 Ok(m) => {
2422 let data = match m.data {
2423 Some(data) => data,
2424 None => {
2425 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2426 }
2427 };
2428 if m.command == "error" {
2429 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2430 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2431 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2432 }
2433 let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2434 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2435 Ok(response)
2436 }
2437 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2438 }
2439 }
2440 #[tracing::instrument(skip_all)]
2442 pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2443 let envelope = config.to_envelope();
2444 let result = self.send(envelope, None).await;
2445 match result {
2446 Ok(m) => {
2447 let data = match m.data {
2448 Some(data) => data,
2449 None => {
2450 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2451 }
2452 };
2453 if m.command == "error" {
2454 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2455 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2456 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2457 }
2458 let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2459 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2460 Ok(response.affectedrows)
2461 }
2462 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2463 }
2464 }
2465 #[tracing::instrument(skip_all)]
2467 pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2468 let envelope = config.to_envelope();
2469 let result = self.send(envelope, None).await;
2470 match result {
2471 Ok(m) => {
2472 let data = match m.data {
2473 Some(data) => data,
2474 None => {
2475 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2476 }
2477 };
2478 if m.command == "error" {
2479 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2480 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2481 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2482 }
2483 let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2484 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2485 Ok(response.affectedrows)
2486 }
2487 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2488 }
2489 }
2490 #[tracing::instrument(skip_all)]
2492 pub async fn download(
2493 &self,
2494 config: DownloadRequest,
2495 folder: Option<&str>,
2496 filename: Option<&str>,
2497 ) -> Result<DownloadResponse, OpenIAPError> {
2498 let envelope = config.to_envelope();
2499 match self.sendwithstream(envelope).await {
2500 Ok((response_rx, mut stream_rx)) => {
2501 let temp_file_path = util::generate_unique_filename("openiap");
2502 debug!("Temp file: {:?}", temp_file_path);
2503 let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2504 OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2505 })?;
2506 while !stream_rx.is_closed() {
2507 match stream_rx.recv().await {
2508 Some(received) => {
2509 if received.is_empty() {
2510 debug!("Stream closed");
2511 break;
2512 }
2513 debug!("Received {} bytes", received.len());
2514 temp_file.write_all(&received).map_err(|e| {
2515 OpenIAPError::ClientError(format!(
2516 "Failed to write to temp file: {}",
2517 e
2518 ))
2519 })?;
2520 }
2521 None => {
2522 debug!("Stream closed");
2523 break;
2524 }
2525 }
2526 }
2527 temp_file.sync_all().map_err(|e| {
2528 OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2529 })?;
2530
2531 let response = response_rx.await.map_err(|_| {
2532 OpenIAPError::ClientError("Failed to receive response".to_string())
2533 })?;
2534
2535 if response.command == "error" {
2536 let data = match response.data {
2537 Some(data) => data,
2538 None => {
2539 return Err(OpenIAPError::ClientError(
2540 "No data returned for SERVER error".to_string(),
2541 ));
2542 }
2543 };
2544 let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2545 return Err(OpenIAPError::ServerError(e.message));
2546 }
2547 let mut downloadresponse: DownloadResponse =
2548 prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2549
2550 let mut final_filename = match &filename {
2551 Some(f) => f,
2552 None => downloadresponse.filename.as_str(),
2553 };
2554 if final_filename.is_empty() {
2555 final_filename = downloadresponse.filename.as_str();
2556 }
2557 let mut folder = match &folder {
2558 Some(f) => f,
2559 None => ".",
2560 };
2561 if folder.is_empty() {
2562 folder = ".";
2563 }
2564 let filepath = format!("{}/{}", folder, final_filename);
2565 trace!("Moving file to {}", filepath);
2566 util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2567 OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2568 })?;
2569 debug!("Downloaded file to {}", filepath);
2570 downloadresponse.filename = filepath;
2571
2572 Ok(downloadresponse)
2573 }
2574 Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2575 }
2576 }
2577 #[tracing::instrument(skip_all)]
2579 pub async fn upload(
2580 &self,
2581 config: UploadRequest,
2582 filepath: &str,
2583 ) -> Result<UploadResponse, OpenIAPError> {
2584 debug!("upload: Uploading file: {}", filepath);
2643 let mut file = File::open(filepath)
2644 .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2645 let chunk_size = 1024 * 1024;
2646 let mut buffer = vec![0; chunk_size];
2647
2648 let envelope = config.to_envelope();
2650 let (response_rx, rid) = self.send_noawait(envelope).await?;
2651
2652 let envelope = BeginStream::from_rid(rid.clone());
2654 debug!("Sending beginstream to #{}", rid);
2655 if let Err(e) = self.send_envelope(envelope).await {
2656 let inner = self.inner.lock().await;
2657 inner.queries.lock().await.remove(&rid);
2658 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2659 }
2660
2661 let mut counter = 0;
2663 loop {
2664 let bytes_read = file.read(&mut buffer).map_err(|e| {
2665 OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2666 })?;
2667 counter += 1;
2668
2669 if bytes_read == 0 {
2670 break;
2671 }
2672
2673 let chunk = buffer[..bytes_read].to_vec();
2674 let envelope = Stream::from_rid(chunk, rid.clone());
2675 debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2676 if let Err(e) = self.send_envelope(envelope).await {
2677 let inner = self.inner.lock().await;
2678 inner.queries.lock().await.remove(&rid);
2679 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2680 }
2681 }
2682
2683 let envelope = EndStream::from_rid(rid.clone());
2685 debug!("Sending endstream to #{}", rid);
2686 if let Err(e) = self.send_envelope(envelope).await {
2687 let inner = self.inner.lock().await;
2688 inner.queries.lock().await.remove(&rid);
2689 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2690 }
2691
2692 debug!("Wait for upload response for #{}", rid);
2694 let result = response_rx.await;
2695 let inner = self.inner.lock().await;
2696 inner.queries.lock().await.remove(&rid);
2697
2698 match result {
2699 Ok(response) => {
2700 if response.command == "error" {
2701 let error_response: ErrorResponse = prost::Message::decode(
2702 response.data.unwrap().value.as_ref(),
2703 )
2704 .map_err(|e| {
2705 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2706 })?;
2707 return Err(OpenIAPError::ServerError(error_response.message));
2708 }
2709 let upload_response: UploadResponse =
2710 prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2711 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2712 })?;
2713 Ok(upload_response)
2714 }
2715 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2716 }
2717 }
2718 #[tracing::instrument(skip_all)]
2720 pub async fn watch(
2721 &self,
2722 mut config: WatchRequest,
2723 callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2724 ) -> Result<String, OpenIAPError> {
2725 if config.collectionname.is_empty() {
2726 config.collectionname = "entities".to_string();
2727 }
2728 if config.paths.is_empty() {
2729 config.paths = vec!["".to_string()];
2730 }
2731
2732 let envelope = config.to_envelope();
2733 let result = self.send(envelope, None).await;
2734 match result {
2735 Ok(m) => {
2736 let data = match m.data {
2737 Some(data) => data,
2738 None => {
2739 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2740 }
2741 };
2742 if m.command == "error" {
2743 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2744 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2745 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2746 }
2747 let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2748 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2749
2750 let inner = self.inner.lock().await;
2751 inner
2752 .watches
2753 .lock()
2754 .await
2755 .insert(response.id.clone(), callback);
2756
2757 Ok(response.id)
2758 }
2759 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2760 }
2761 }
2762 #[tracing::instrument(skip_all)]
2764 pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2765 let config = UnWatchRequest::byid(id);
2766 let envelope = config.to_envelope();
2767 let result = self.send(envelope, None).await;
2768 match result {
2769 Ok(m) => {
2770 let data = match m.data {
2771 Some(data) => data,
2772 None => {
2773 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2774 }
2775 };
2776 if m.command == "error" {
2777 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2778 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2779 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2780 }
2781 Ok(())
2782 }
2783 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2784 }
2785 }
2786 #[tracing::instrument(skip_all)]
2788 pub async fn register_queue(
2789 &self,
2790 mut config: RegisterQueueRequest,
2791 callback: QueueCallbackFn,
2792 ) -> Result<String, OpenIAPError> {
2793 if config.queuename.is_empty() {
2794 config.queuename = "".to_string();
2795 }
2796
2797 let envelope = config.to_envelope();
2798 let result = self.send(envelope, None).await;
2799 match result {
2800 Ok(m) => {
2801 let data = match m.data {
2802 Some(data) => data,
2803 None => {
2804 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2805 }
2806 };
2807 if m.command == "error" {
2808 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2809 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2810 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2811 }
2812 let response: RegisterQueueResponse =
2813 prost::Message::decode(data.value.as_ref())
2814 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2815
2816 let inner = self.inner.lock().await;
2817 inner
2818 .queues
2819 .lock()
2820 .await
2821 .insert(response.queuename.clone(), callback);
2822
2823 Ok(response.queuename)
2824 }
2825 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2826 }
2827 }
2828 #[tracing::instrument(skip_all)]
2830 pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2831 let config = UnRegisterQueueRequest::byqueuename(queuename);
2832 let envelope = config.to_envelope();
2833 let result = self.send(envelope, None).await;
2834 match result {
2835 Ok(m) => {
2836 let data = match m.data {
2837 Some(data) => data,
2838 None => {
2839 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2840 }
2841 };
2842 if m.command == "error" {
2843 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2844 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2845 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2846 }
2847 Ok(())
2848 }
2849 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2850 }
2851 }
2852 #[tracing::instrument(skip_all)]
2854 pub async fn register_exchange(
2855 &self,
2856 mut config: RegisterExchangeRequest,
2857 callback: QueueCallbackFn,
2858 ) -> Result<String, OpenIAPError> {
2859 if config.exchangename.is_empty() {
2860 return Err(OpenIAPError::ClientError(
2861 "No exchange name provided".to_string(),
2862 ));
2863 }
2864 if config.algorithm.is_empty() {
2865 config.algorithm = "fanout".to_string();
2866 }
2867 let envelope = config.to_envelope();
2868 let result = self.send(envelope, None).await;
2869 match result {
2870 Ok(m) => {
2871 let data = match m.data {
2872 Some(data) => data,
2873 None => {
2874 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2875 }
2876 };
2877 if m.command == "error" {
2878 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2879 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2880 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2881 }
2882 let response: RegisterExchangeResponse =
2883 prost::Message::decode(data.value.as_ref())
2884 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2885 if !response.queuename.is_empty() {
2886 let inner = self.inner.lock().await;
2887 inner
2888 .queues
2889 .lock()
2890 .await
2891 .insert(response.queuename.clone(), callback);
2892 }
2893 Ok(response.queuename)
2894 }
2895 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2896 }
2897 }
2898 #[tracing::instrument(skip_all)]
2900 pub async fn queue_message(
2901 &self,
2902 config: QueueMessageRequest,
2903 ) -> Result<QueueMessageResponse, OpenIAPError> {
2904 if config.queuename.is_empty() && config.exchangename.is_empty() {
2905 return Err(OpenIAPError::ClientError(
2906 "No queue or exchange name provided".to_string(),
2907 ));
2908 }
2909 let envelope = config.to_envelope();
2910 let result = self.send(envelope, None).await;
2911 match result {
2912 Ok(m) => {
2913 let data = match m.data {
2914 Some(d) => d,
2915 None => {
2916 return Err(OpenIAPError::ClientError("No data in response".to_string()))
2917 }
2918 };
2919 if m.command == "error" {
2920 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2921 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2922 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2923 }
2924 let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2925 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2926 Ok(response)
2927 }
2928 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2929 }
2930 }
2931 #[tracing::instrument(skip_all)]
2933 pub async fn rpc(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
2934 if config.queuename.is_empty() && config.exchangename.is_empty() {
2935 return Err(OpenIAPError::ClientError(
2936 "No queue or exchange name provided".to_string(),
2937 ));
2938 }
2939
2940 let (tx, rx) = oneshot::channel::<String>();
2941 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2942
2943 let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
2945 if let Some(tx) = tx.lock().unwrap().take() {
2946 let _ = tx.send(event.data);
2947 } else {
2948 debug!("Queue already closed");
2949 }
2950 Box::pin(async { None })
2951 });
2952
2953 let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
2955 let mut callback_guard = self.rpc_callback.lock().await;
2956
2957 let state = self.get_state();
2959 if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
2960 let q = self
2962 .register_queue(
2963 RegisterQueueRequest {
2964 queuename: "".to_string(),
2965 },
2966 callback.clone(),
2967 )
2968 .await?;
2969 *reply_queue_guard = Some(q.clone());
2970 *callback_guard = Some(callback.clone());
2971 } else {
2972 if let Some(qname) = reply_queue_guard.as_ref() {
2974 let inner = self.inner.lock().await;
2975 inner.queues.lock().await.insert(qname.clone(), callback.clone());
2976 }
2977 }
2978
2979 let q = reply_queue_guard.as_ref().unwrap().clone();
2980 config.replyto = q.clone();
2981 let envelope = config.to_envelope();
2982
2983 let result = self.send(envelope, None).await;
2984 let rpc_result = match result {
2985 Ok(m) => {
2986 let data = match m.data {
2987 Some(d) => d,
2988 None => {
2989 return Err(OpenIAPError::ClientError("No data in response".to_string()))
2990 }
2991 };
2992 if m.command == "error" {
2993 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2994 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2995 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2996 }
2997
2998 match tokio::time::timeout(timeout, rx).await {
2999 Ok(Ok(val)) => Ok(val),
3000 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3001 Err(_) => {
3002 *reply_queue_guard = None;
3004 *callback_guard = None;
3005 Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3006 },
3007 }
3008 }
3009 Err(e) => {
3010 *reply_queue_guard = None;
3012 *callback_guard = None;
3013 Err(OpenIAPError::ClientError(e.to_string()))
3014 }
3015 };
3016
3017 rpc_result
3018 }
3019 #[tracing::instrument(skip_all)]
3093 pub async fn push_workitem(
3094 &self,
3095 mut config: PushWorkitemRequest,
3096 ) -> Result<PushWorkitemResponse, OpenIAPError> {
3097 if config.wiq.is_empty() && config.wiqid.is_empty() {
3098 return Err(OpenIAPError::ClientError(
3099 "No queue name or id provided".to_string(),
3100 ));
3101 }
3102 for f in &mut config.files {
3103 if f.filename.is_empty() && f.file.is_empty() {
3104 debug!("Filename is empty");
3105 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3106 if !std::path::Path::new(&f.filename).exists() {
3108 debug!("File does not exist: {}", f.filename);
3109 } else {
3110 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3111 if filesize < 5 * 1024 * 1024 {
3113 debug!("File {} exists so ATTACHING it.", f.filename);
3114 let filename = std::path::Path::new(&f.filename)
3115 .file_name()
3116 .unwrap()
3117 .to_str()
3118 .unwrap();
3119 f.file = std::fs::read(&f.filename).unwrap();
3120 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3123 f.compressed = true;
3124 f.filename = filename.to_string();
3125 f.id = "findme".to_string();
3126 trace!(
3127 "File {} was read and assigned to f.file, size: {}",
3128 f.filename,
3129 f.file.len()
3130 );
3131 } else {
3132 debug!("File {} exists so UPLOADING it.", f.filename);
3133 let filename = std::path::Path::new(&f.filename)
3134 .file_name()
3135 .unwrap()
3136 .to_str()
3137 .unwrap();
3138 let uploadconfig = UploadRequest {
3139 filename: filename.to_string(),
3140 collectionname: "fs.files".to_string(),
3141 ..Default::default()
3142 };
3143 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3144 trace!("File {} was upload as {}", filename, uploadresult.id);
3145 f.id = uploadresult.id.clone();
3147 f.filename = filename.to_string();
3148 }
3149 }
3150 } else {
3151 debug!("File {} is already uploaded", f.filename);
3152 }
3153 }
3154 let envelope = config.to_envelope();
3155 let result = self.send(envelope, None).await;
3156 match result {
3157 Ok(m) => {
3158 let data = match m.data {
3159 Some(data) => data,
3160 None => {
3161 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3162 }
3163 };
3164 if m.command == "error" {
3165 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3166 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3167 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3168 }
3169 let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3170 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3171 Ok(response)
3172 }
3173 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3174 }
3175 }
3176 #[tracing::instrument(skip_all)]
3180 pub async fn push_workitems(
3181 &self,
3182 mut config: PushWorkitemsRequest,
3183 ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3184 if config.wiq.is_empty() && config.wiqid.is_empty() {
3185 return Err(OpenIAPError::ClientError(
3186 "No queue name or id provided".to_string(),
3187 ));
3188 }
3189 for wi in &mut config.items {
3190 for f in &mut wi.files {
3191 if f.filename.is_empty() && f.file.is_empty() {
3192 debug!("Filename is empty");
3193 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3194 if !std::path::Path::new(&f.filename).exists() {
3196 debug!("File does not exist: {}", f.filename);
3197 } else {
3198 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3199 if filesize < 5 * 1024 * 1024 {
3201 debug!("File {} exists so ATTACHING it.", f.filename);
3202 let filename = std::path::Path::new(&f.filename)
3203 .file_name()
3204 .unwrap()
3205 .to_str()
3206 .unwrap();
3207 f.file = std::fs::read(&f.filename).unwrap();
3208 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3211 f.compressed = true;
3212 f.filename = filename.to_string();
3213 f.id = "findme".to_string();
3214 trace!(
3215 "File {} was read and assigned to f.file, size: {}",
3216 f.filename,
3217 f.file.len()
3218 );
3219 } else {
3220 debug!("File {} exists so UPLOADING it.", f.filename);
3221 let filename = std::path::Path::new(&f.filename)
3222 .file_name()
3223 .unwrap()
3224 .to_str()
3225 .unwrap();
3226 let uploadconfig = UploadRequest {
3227 filename: filename.to_string(),
3228 collectionname: "fs.files".to_string(),
3229 ..Default::default()
3230 };
3231 let uploadresult =
3232 self.upload(uploadconfig, &f.filename).await.unwrap();
3233 trace!("File {} was upload as {}", filename, uploadresult.id);
3234 f.id = uploadresult.id.clone();
3236 f.filename = filename.to_string();
3237 }
3238 }
3239 } else {
3240 debug!("File {} is already uploaded", f.filename);
3241 }
3242 }
3243 }
3244 let envelope = config.to_envelope();
3245 let result = self.send(envelope, None).await;
3246 match result {
3247 Ok(m) => {
3248 let data = match m.data {
3249 Some(data) => data,
3250 None => {
3251 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3252 }
3253 };
3254 if m.command == "error" {
3255 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3256 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3257 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3258 }
3259 let response: PushWorkitemsResponse =
3260 prost::Message::decode(data.value.as_ref())
3261 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3262 Ok(response)
3263 }
3264 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3265 }
3266 }
3267 #[tracing::instrument(skip_all)]
3270 pub async fn pop_workitem(
3271 &self,
3272 config: PopWorkitemRequest,
3273 downloadfolder: Option<&str>,
3274 ) -> Result<PopWorkitemResponse, OpenIAPError> {
3275 if config.wiq.is_empty() && config.wiqid.is_empty() {
3276 return Err(OpenIAPError::ClientError(
3277 "No queue name or id provided".to_string(),
3278 ));
3279 }
3280 let envelope = config.to_envelope();
3281 let result = self.send(envelope, None).await;
3282 match result {
3283 Ok(m) => {
3284 let data = match m.data {
3285 Some(data) => data,
3286 None => {
3287 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3288 }
3289 };
3290 if m.command == "error" {
3291 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3292 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3293 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3294 }
3295 let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3296 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3297
3298 match &response.workitem {
3299 Some(wi) => {
3300 for f in &wi.files {
3301 if !f.id.is_empty() {
3302 let downloadconfig = DownloadRequest {
3303 id: f.id.clone(),
3304 collectionname: "fs.files".to_string(),
3305 ..Default::default()
3306 };
3307 let downloadresult =
3308 match self.download(downloadconfig, downloadfolder, None).await
3309 {
3310 Ok(r) => r,
3311 Err(e) => {
3312 debug!("Failed to download file: {}", e);
3313 continue;
3314 }
3315 };
3316 debug!(
3317 "File {} was downloaded as {}",
3318 f.filename, downloadresult.filename
3319 );
3320 }
3321 }
3322 }
3323 None => {
3324 debug!("No workitem found");
3325 }
3326 }
3327 Ok(response)
3328 }
3329 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3330 }
3331 }
3332 #[tracing::instrument(skip_all)]
3338 pub async fn update_workitem(
3339 &self,
3340 mut config: UpdateWorkitemRequest,
3341 ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3342 match &config.workitem {
3343 Some(wiq) => {
3344 if wiq.id.is_empty() {
3345 return Err(OpenIAPError::ClientError(
3346 "No workitem id provided".to_string(),
3347 ));
3348 }
3349 }
3350 None => {
3351 return Err(OpenIAPError::ClientError(
3352 "No workitem provided".to_string(),
3353 ));
3354 }
3355 }
3356 for f in &mut config.files {
3357 if f.filename.is_empty() && f.file.is_empty() {
3358 debug!("Filename is empty");
3359 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3360 if !std::path::Path::new(&f.filename).exists() {
3361 debug!("File does not exist: {}", f.filename);
3362 } else {
3363 debug!("File {} exists so uploading it.", f.filename);
3364 let filename = std::path::Path::new(&f.filename)
3365 .file_name()
3366 .unwrap()
3367 .to_str()
3368 .unwrap();
3369 let uploadconfig = UploadRequest {
3370 filename: filename.to_string(),
3371 collectionname: "fs.files".to_string(),
3372 ..Default::default()
3373 };
3374 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3375 trace!("File {} was upload as {}", filename, uploadresult.id);
3376 f.id = uploadresult.id.clone();
3377 f.filename = filename.to_string();
3378 }
3379 } else {
3380 debug!("Skipped file");
3381 }
3382 }
3383 let envelope = config.to_envelope();
3384 let result = self.send(envelope, None).await;
3385 match result {
3386 Ok(m) => {
3387 let data = match m.data {
3388 Some(d) => d,
3389 None => {
3390 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3391 }
3392 };
3393 if m.command == "error" {
3394 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3395 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3396 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3397 }
3398 let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3399 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3400 Ok(response)
3401 }
3402 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3403 }
3404 }
3405 #[tracing::instrument(skip_all)]
3407 pub async fn delete_workitem(
3408 &self,
3409 config: DeleteWorkitemRequest,
3410 ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3411 if config.id.is_empty() {
3412 return Err(OpenIAPError::ClientError(
3413 "No workitem id provided".to_string(),
3414 ));
3415 }
3416 let envelope = config.to_envelope();
3417 let result = self.send(envelope, None).await;
3418 match result {
3419 Ok(m) => {
3420 let data = match m.data {
3421 Some(d) => d,
3422 None => {
3423 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3424 }
3425 };
3426 if m.command == "error" {
3427 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3428 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3429 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3430 }
3431 let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3432 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3433 Ok(response)
3434 }
3435 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3436 }
3437 }
3438 #[tracing::instrument(skip_all)]
3440 pub async fn add_workitem_queue(
3441 &self,
3442 config: AddWorkItemQueueRequest,
3443 ) -> Result<WorkItemQueue, OpenIAPError> {
3444 if config.workitemqueue.is_none() {
3445 return Err(OpenIAPError::ClientError(
3446 "No workitem queue name provided".to_string(),
3447 ));
3448 }
3449 let envelope = config.to_envelope();
3450 let result = self.send(envelope, None).await;
3451 match result {
3452 Ok(m) => {
3453 let data = match m.data {
3454 Some(d) => d,
3455 None => {
3456 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3457 }
3458 };
3459 if m.command == "error" {
3460 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3461 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3463 }
3464 let response: AddWorkItemQueueResponse =
3465 prost::Message::decode(data.value.as_ref())
3466 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3467 match response.workitemqueue {
3468 Some(wiq) => Ok(wiq),
3469 None => {
3470 return Err(OpenIAPError::ClientError(
3471 "No workitem queue returned".to_string(),
3472 ));
3473 }
3474 }
3475 }
3476 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3477 }
3478 }
3479 #[tracing::instrument(skip_all)]
3481 pub async fn update_workitem_queue(
3482 &self,
3483 config: UpdateWorkItemQueueRequest,
3484 ) -> Result<WorkItemQueue, OpenIAPError> {
3485 if config.workitemqueue.is_none() {
3486 return Err(OpenIAPError::ClientError(
3487 "No workitem queue name provided".to_string(),
3488 ));
3489 }
3490 let envelope = config.to_envelope();
3491 let result = self.send(envelope, None).await;
3492 match result {
3493 Ok(m) => {
3494 let data = match m.data {
3495 Some(d) => d,
3496 None => {
3497 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3498 }
3499 };
3500 if m.command == "error" {
3501 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3502 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3503 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3504 }
3505 let response: UpdateWorkItemQueueResponse =
3506 prost::Message::decode(data.value.as_ref())
3507 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3508 match response.workitemqueue {
3509 Some(wiq) => Ok(wiq),
3510 None => {
3511 return Err(OpenIAPError::ClientError(
3512 "No workitem queue returned".to_string(),
3513 ));
3514 }
3515 }
3516 }
3517 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3518 }
3519 }
3520 #[tracing::instrument(skip_all)]
3522 pub async fn delete_workitem_queue(
3523 &self,
3524 config: DeleteWorkItemQueueRequest,
3525 ) -> Result<(), OpenIAPError> {
3526 if config.wiq.is_empty() && config.wiqid.is_empty() {
3527 return Err(OpenIAPError::ClientError(
3528 "No workitem queue name or id provided".to_string(),
3529 ));
3530 }
3531 let envelope = config.to_envelope();
3532 let result = self.send(envelope, None).await;
3533 match result {
3534 Ok(m) => {
3535 let data = match m.data {
3536 Some(d) => d,
3537 None => {
3538 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3539 }
3540 };
3541 if m.command == "error" {
3542 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3543 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3544 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3545 }
3546 Ok(())
3547 }
3548 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3549 }
3550 }
3551 #[tracing::instrument(skip_all)]
3553 pub async fn custom_command(
3554 &self,
3555 config: CustomCommandRequest,
3556 timeout: Option<tokio::time::Duration>,
3557 ) -> Result<String, OpenIAPError> {
3558 if config.command.is_empty() {
3559 return Err(OpenIAPError::ClientError("No command provided".to_string()));
3560 }
3561 let envelope = config.to_envelope();
3562 let result = self.send(envelope, timeout).await;
3563 match result {
3564 Ok(m) => {
3565 let data = match m.data {
3566 Some(d) => d,
3567 None => {
3568 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3569 }
3570 };
3571 if m.command == "error" {
3572 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3573 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3574 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3575 }
3576 let response: CustomCommandResponse =
3577 prost::Message::decode(data.value.as_ref())
3578 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3579 Ok(response.result)
3580 }
3581 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3582 }
3583 }
3584 #[tracing::instrument(skip_all)]
3586 pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3587 let config = DeletePackageRequest::byid(packageid);
3588 let envelope = config.to_envelope();
3589 let result = self.send(envelope, None).await;
3590 match result {
3591 Ok(m) => {
3592 let data = match m.data {
3593 Some(data) => data,
3594 None => {
3595 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3596 }
3597 };
3598 if m.command == "error" {
3599 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3600 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3601 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3602 }
3603 Ok(())
3606 }
3607 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3608 }
3609 }
3610 #[tracing::instrument(skip_all)]
3612 pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3613 let config = StartAgentRequest::byid(agentid);
3614 let envelope = config.to_envelope();
3615 let result = self.send(envelope, None).await;
3616 match result {
3617 Ok(m) => {
3618 let data = match m.data {
3619 Some(d) => d,
3620 None => {
3621 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3622 }
3623 };
3624 if m.command == "error" {
3625 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3626 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3627 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3628 }
3629 Ok(())
3632 }
3633 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3634 }
3635 }
3636 #[tracing::instrument(skip_all)]
3638 pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3639 let config = StopAgentRequest::byid(agentid);
3640 let envelope = config.to_envelope();
3641 let result = self.send(envelope, None).await;
3642 match result {
3643 Ok(m) => {
3644 let data = match m.data {
3645 Some(d) => d,
3646 None => {
3647 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3648 }
3649 };
3650 if m.command == "error" {
3651 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3652 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3653 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3654 }
3655 Ok(())
3658 }
3659 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3660 }
3661 }
3662 #[tracing::instrument(skip_all)]
3664 pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3665 let config = DeleteAgentPodRequest::byid(agentid, podname);
3666 let envelope = config.to_envelope();
3667 let result = self.send(envelope, None).await;
3668 match result {
3669 Ok(m) => {
3670 let data = match m.data {
3671 Some(d) => d,
3672 None => {
3673 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3674 }
3675 };
3676 if m.command == "error" {
3677 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3678 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3679 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3680 }
3681 Ok(())
3684 }
3685 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3686 }
3687 }
3688 #[tracing::instrument(skip_all)]
3690 pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3691 let config = DeleteAgentRequest::byid(agentid);
3692 let envelope = config.to_envelope();
3693 let result = self.send(envelope, None).await;
3694 match result {
3695 Ok(m) => {
3696 let data = match m.data {
3697 Some(d) => d,
3698 None => {
3699 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3700 }
3701 };
3702 if m.command == "error" {
3703 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3704 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3705 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3706 }
3707 Ok(())
3710 }
3711 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3712 }
3713 }
3714 #[tracing::instrument(skip_all)]
3716 pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3717 let config = GetAgentPodsRequest::byid(agentid, stats);
3718 let envelope = config.to_envelope();
3719 let result = self.send(envelope, None).await;
3720 match result {
3721 Ok(m) => {
3722 let data = match m.data {
3723 Some(d) => d,
3724 None => {
3725 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3726 }
3727 };
3728 if m.command == "error" {
3729 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3730 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3731 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3732 }
3733 let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3734 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3735 Ok(response.results)
3736 }
3737 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3738 }
3739 }
3740 #[tracing::instrument(skip_all)]
3742 pub async fn get_agent_pod_logs(
3743 &self,
3744 agentid: &str,
3745 podname: &str,
3746 ) -> Result<String, OpenIAPError> {
3747 let config = GetAgentLogRequest::new(agentid, podname);
3748 let envelope = config.to_envelope();
3749 let result = self.send(envelope, None).await;
3750 match result {
3751 Ok(m) => {
3752 let data = match m.data {
3753 Some(d) => d,
3754 None => {
3755 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3756 }
3757 };
3758 if m.command == "error" {
3759 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3760 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3761 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3762 }
3763 let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3764 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3765 Ok(response.result)
3766 }
3767 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3768 }
3769 }
3770
3771 #[tracing::instrument(skip_all)]
3774 pub async fn ensure_customer(
3775 &self,
3776 config: EnsureCustomerRequest,
3777 ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3778 if config.customer.is_none() && config.stripe.is_none() {
3779 return Err(OpenIAPError::ClientError(
3780 "No customer or stripe provided".to_string(),
3781 ));
3782 }
3783 let envelope = config.to_envelope();
3784 let result = self.send(envelope, None).await;
3785 match result {
3786 Ok(m) => {
3787 let data = match m.data {
3788 Some(d) => d,
3789 None => {
3790 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3791 }
3792 };
3793 if m.command == "error" {
3794 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3795 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3796 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3797 }
3798 let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3799 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3800 Ok(response)
3801 }
3802 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3803 }
3804 }
3805 #[tracing::instrument(skip_all)]
3807 pub async fn create_workflow_instance(
3808 &self,
3809 config: CreateWorkflowInstanceRequest,
3810 ) -> Result<String, OpenIAPError> {
3811 if config.workflowid.is_empty() {
3812 return Err(OpenIAPError::ClientError(
3813 "No workflow id provided".to_string(),
3814 ));
3815 }
3816 let envelope = config.to_envelope();
3817 let result = self.send(envelope, None).await;
3818 match result {
3819 Ok(m) => {
3820 let data = match m.data {
3821 Some(d) => d,
3822 None => {
3823 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3824 }
3825 };
3826 if m.command == "error" {
3827 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3828 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3829 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3830 }
3831 let response: CreateWorkflowInstanceResponse =
3832 prost::Message::decode(data.value.as_ref())
3833 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3834 Ok(response.instanceid)
3835 }
3836 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3837 }
3838 }
3839
3840 #[tracing::instrument(skip_all)]
3842 pub async fn invoke_openrpa(
3843 &self,
3844 config: InvokeOpenRpaRequest,
3845 timeout: Option<tokio::time::Duration>,
3846 ) -> Result<String, OpenIAPError> {
3847 if config.robotid.is_empty() {
3848 return Err(OpenIAPError::ClientError(
3849 "No robot id provided".to_string(),
3850 ));
3851 }
3852 if config.workflowid.is_empty() {
3853 return Err(OpenIAPError::ClientError(
3854 "No workflow id provided".to_string(),
3855 ));
3856 }
3857
3858 let (tx, rx) = oneshot::channel::<String>();
3859 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3860
3861 let q = self
3862 .register_queue(
3863 RegisterQueueRequest {
3864 queuename: "".to_string(),
3865 },
3866 Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3867 let tx = tx.clone();
3868 Box::pin(async move {
3869 let json = event.data.clone();
3870 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3871 let command: String = obj["command"].as_str().unwrap().to_string();
3872 debug!("Received event: {:?}", event);
3873 if command.eq("invokesuccess") {
3874 debug!("Robot successfully started running workflow");
3875 } else if command.eq("invokeidle") {
3876 debug!("Workflow went idle");
3877 } else if command.eq("invokeerror") {
3878 debug!("Robot failed to run workflow");
3879 let tx = tx.lock().unwrap().take().unwrap();
3880 tx.send(event.data).unwrap();
3881 } else if command.eq("timeout") {
3882 debug!("No robot picked up the workflow");
3883 let tx = tx.lock().unwrap().take().unwrap();
3884 tx.send(event.data).unwrap();
3885 } else if command.eq("invokecompleted") {
3886 debug!("Robot completed running workflow");
3887 let tx = tx.lock().unwrap().take().unwrap();
3888 tx.send(event.data).unwrap();
3889 } else {
3890 let tx = tx.lock().unwrap().take().unwrap();
3891 tx.send(event.data).unwrap();
3892 }
3893 None
3894 })
3895 }),
3896 )
3897 .await
3898 .unwrap();
3899 debug!("Registered Response Queue: {:?}", q);
3900 let data = format!(
3901 "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3902 config.workflowid, config.payload
3903 );
3904 debug!("Send Data: {}", data);
3905 debug!("To Queue: {} With reply to: {}", config.robotid, q);
3906 let config = QueueMessageRequest {
3907 queuename: config.robotid.clone(),
3908 replyto: q.clone(),
3909 data,
3910 ..Default::default()
3911 };
3912
3913 let envelope = config.to_envelope();
3914
3915 let result = self.send(envelope, timeout).await;
3916 match result {
3917 Ok(m) => {
3918 let data = match m.data {
3919 Some(d) => d,
3920 None => {
3921 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3922 }
3923 };
3924 if m.command == "error" {
3925 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3926 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3927 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3928 }
3929 let duration = timeout.unwrap_or_else(|| self.get_default_timeout());
3933 let json = match tokio::time::timeout(duration, rx).await {
3935 Ok(Ok(val)) => {
3936 let _ = self.unregister_queue(&q).await;
3938 val
3939 },
3940 Ok(Err(e)) => {
3941 let _ = self.unregister_queue(&q).await;
3942 return Err(OpenIAPError::CustomError(e.to_string()));
3943 },
3944 Err(_) => {
3945 let _ = self.unregister_queue(&q).await;
3946 return Err(OpenIAPError::ServerError("Timeout".to_string()));
3947 },
3948 };
3949 debug!("Received json result: {:?}", json);
3950 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3951 let command: String = obj["command"].as_str().unwrap().to_string();
3952 let mut data = "".to_string();
3953 if obj["data"].as_str().is_some() {
3954 data = obj["data"].as_str().unwrap().to_string();
3955 } else if obj["data"].as_object().is_some() {
3956 data = obj["data"].to_string();
3957 }
3958 if !command.eq("invokecompleted") {
3959 if command.eq("timeout") {
3960 return Err(OpenIAPError::ServerError("Timeout".to_string()));
3961 } else {
3962 if data.is_empty() {
3963 return Err(OpenIAPError::ServerError(
3964 "Error with no message".to_string(),
3965 ));
3966 }
3967 return Err(OpenIAPError::ServerError(data));
3968 }
3969 }
3970 Ok(data)
3980 }
3981 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3982 }
3983 }
3984}
3985