1#![warn(missing_docs)]
2pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62const VERSION: &str = "0.0.32";
63
64
65#[derive(Clone)]
68pub struct Client {
69 connect_called: Arc<std::sync::Mutex<bool>>,
71 runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74 stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77 task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78 pub client: Arc<std::sync::Mutex<ClientEnum>>,
80 user: Arc<std::sync::Mutex<Option<User>>>,
82 pub inner: Arc<Mutex<ClientInner>>,
84 pub config: Arc<std::sync::Mutex<Option<Config>>>,
86 pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88 pub url: Arc<std::sync::Mutex<String>>,
90 pub username: Arc<std::sync::Mutex<String>>,
92 pub password: Arc<std::sync::Mutex<String>>,
94 pub jwt: Arc<std::sync::Mutex<String>>,
96 service_name: Arc<std::sync::Mutex<String>>,
97 agent_name: Arc<std::sync::Mutex<String>>,
98 agent_version: Arc<std::sync::Mutex<String>>,
99 event_sender: async_channel::Sender<ClientEvent>,
100 event_receiver: async_channel::Receiver<ClientEvent>,
101 out_envelope_sender: async_channel::Sender<Envelope>,
102 out_envelope_receiver: async_channel::Receiver<Envelope>,
103 rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104 rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106 pub state: Arc<std::sync::Mutex<ClientState>>,
108 pub msgcount: Arc<std::sync::Mutex<i32>>,
110 pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112 pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115#[derive(Clone, Default)]
117pub struct ClientStatistics {
118 connection_attempts: u64,
119 connections: u64,
120 package_tx: u64,
121 package_rx: u64,
122 signin: u64,
123 download: u64,
124 getdocumentversion: u64,
125 customcommand: u64,
126 listcollections: u64,
127 createcollection: u64,
128 dropcollection: u64,
129 ensurecustomer: u64,
130 invokeopenrpa: u64,
131 registerqueue: u64,
132 registerexchange: u64,
133 unregisterqueue: u64,
134 watch: u64,
135 unwatch: u64,
136 queuemessage: u64,
137 pushworkitem: u64,
138 pushworkitems: u64,
139 popworkitem: u64,
140 updateworkitem: u64,
141 deleteworkitem: u64,
142 addworkitemqueue: u64,
143 updateworkitemqueue: u64,
144 deleteworkitemqueue: u64,
145 getindexes: u64,
146 createindex: u64,
147 dropindex: u64,
148 upload: u64,
149 query: u64,
150 count: u64,
151 distinct: u64,
152 aggregate: u64,
153 insertone: u64,
154 insertmany: u64,
155 insertorupdateone: u64,
156 insertorupdatemany: u64,
157 updateone: u64,
158 updatedocument: u64,
159 deleteone: u64,
160 deletemany: u64,
161}
162use futures::future::BoxFuture;
164type QueueCallbackFn =
165 Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167#[derive(Clone)]
170pub struct ClientInner {
171 pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173 pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175 #[allow(clippy::type_complexity)]
177 pub watches:
178 Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179 #[allow(clippy::type_complexity)]
181 pub queues:
182 Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184#[derive(Clone, Debug)]
186pub enum ClientEnum {
187 None,
189 Grpc(FlowServiceClient<tonic::transport::Channel>),
191 WS(Arc<Mutex<Sock>>)
193}
194#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197 Connecting,
199 Connected,
201 Disconnected(String),
203 SignedIn,
205 }
218#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221 Disconnected,
223 Connecting,
225 Connected,
227 Signedin
229}
230#[derive(Debug, Clone, serde::Deserialize)]
232#[allow(dead_code)]
233pub struct Config {
234 #[serde(default)]
235 wshost: String,
236 #[serde(default)]
237 wsurl: String,
238 #[serde(default)]
239 domain: String,
240 #[serde(default)]
241 auto_create_users: bool,
242 #[serde(default)]
243 namespace: String,
244 #[serde(default)]
245 agent_domain_schema: String,
246 #[serde(default)]
247 version: String,
248 #[serde(default)]
249 validate_emails: bool,
250 #[serde(default)]
251 forgot_pass_emails: bool,
252 #[serde(default)]
253 supports_watch: bool,
254 #[serde(default)]
255 amqp_enabled_exchange: bool,
256 #[serde(default)]
257 multi_tenant: bool,
258 #[serde(default)]
259 enable_entity_restriction: bool,
260 #[serde(default)]
261 enable_web_tours: bool,
262 #[serde(default)]
263 collections_with_text_index: Vec<String>,
264 #[serde(default)]
265 timeseries_collections: Vec<String>,
266 #[serde(default)]
267 ping_clients_interval: i32,
268 #[serde(default)]
269 validlicense: bool,
270 #[serde(default)]
271 forceddomains: Vec<String>,
272 #[serde(default)]
273 grafana_url: String,
274 #[serde(default)]
275 otel_metric_url: String,
276 #[serde(default)]
277 otel_trace_url: String,
278 #[serde(default)]
279 otel_log_url: String,
280 #[serde(default)]
281 enable_analytics: bool,
282}
283impl std::fmt::Debug for ClientInner {
284 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
285 f.debug_struct("ClientInner")
286 .field("queries", &self.queries)
288 .field("streams", &self.streams)
289 .finish()
290 }
291}
292impl Default for Client {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298impl Client {
299 pub fn new() -> Self {
301 let (ces, cer) = unbounded::<ClientEvent>();
302 let (out_es, out_er) = unbounded::<Envelope>();
303 Self {
304 task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
305 stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
306 user: Arc::new(std::sync::Mutex::new(None)),
307 client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
308 connect_called: Arc::new(std::sync::Mutex::new(false)),
309 runtime: Arc::new(std::sync::Mutex::new(None)),
310 msgcount: Arc::new(std::sync::Mutex::new(-1)),
311 reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
312 rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
313 rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
314 inner: Arc::new(Mutex::new(ClientInner {
315 queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
316 streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
317 watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
318 queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
319 })),
320 config: Arc::new(std::sync::Mutex::new(None)),
321 auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
322 url: Arc::new(std::sync::Mutex::new("".to_string())),
323 username: Arc::new(std::sync::Mutex::new("".to_string())),
324 password: Arc::new(std::sync::Mutex::new("".to_string())),
325 jwt: Arc::new(std::sync::Mutex::new("".to_string())),
326 service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
327 agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
328 agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
329 event_sender: ces,
330 event_receiver: cer,
331 out_envelope_sender: out_es,
332 out_envelope_receiver: out_er,
333 state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
334 default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
335 }
336 }
337 #[tracing::instrument(skip_all)]
339 pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
340 let rt = match tokio::runtime::Runtime::new() {
341 Ok(rt) => rt,
342 Err(e) => {
343 return Err(OpenIAPError::ClientError(format!(
344 "Failed to create tokio runtime: {}",
345 e
346 )));
347 }
348 };
349 self.set_runtime(Some(rt));
350 tokio::task::block_in_place(|| {
351 let handle = self.get_runtime_handle();
352 handle.block_on(self.connect_async(dst))
353 })
354 }
355
356 #[allow(unused_variables)]
358 pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
359 let config: Option<Config>;
360 let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
361 let mut port = url.port().unwrap_or(80);
362 if issecure {
363 port = 443;
364 }
365 let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
366 if host.starts_with("api-grpc") {
367 host = "api".to_string();
368 }
369 if port == 50051 {
370 port = 3000;
371 }
372 let configurl = if issecure {
373 format!(
374 "{}://{}:{}/config",
375 "https",
376 host,
377 port
378 )
379 } else {
380 format!(
381 "{}://{}:{}/config",
382 "http",
383 host,
384 port
385 )
386 };
387
388 let configurl = url::Url::parse(configurl.as_str())
389 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
390 trace!("Getting config from: {}", configurl);
391 let o = minreq::get(configurl).with_timeout(5).send();
392 match o {
393 Ok(_) => {
394 let response = match o {
395 Ok(response) => response,
396 Err(e) => {
397 error!("Failed to get config: {}", e);
398 return None;
399 }
400 };
401 if response.status_code == 200 {
402 let body = response.as_str().unwrap();
403 config = Some(match serde_json::from_str(body) {
404 Ok(config) => config,
405 Err(e) => {
406 error!("Failed to parse config: {}", e);
407 return None;
408 }
409 });
410 } else {
411 config = None;
412 }
413 }
414 Err(e) => {
415 error!("Failed to get config: {}", e);
416 return None;
417 }
418 }
419 let mut _enable_analytics = true;
420 let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
421 let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
422 let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
423 let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
424 if apihostname.starts_with("grpc.") {
425 apihostname = apihostname[5..].to_string();
426 }
427
428 if config.is_some() {
429 let config = config.as_ref().unwrap();
430 if !config.otel_metric_url.is_empty() {
431 _otel_metric_url = config.otel_metric_url.clone();
432 }
433 if !config.otel_trace_url.is_empty() {
434 _otel_trace_url = config.otel_trace_url.clone();
435 }
436 if !config.otel_log_url.is_empty() {
437 _otel_log_url = config.otel_log_url.clone();
438 }
439 if !config.domain.is_empty() {
440 apihostname = config.domain.clone();
441 }
442 _enable_analytics = config.enable_analytics;
443 }
444 #[cfg(feature = "otel")]
445 if _enable_analytics {
446 let service_name = self.get_service_name();
447 let agent_name = self.get_agent_name();
448 let agent_version = self.get_agent_version();
449 match otel::init_telemetry(&service_name, &agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(),
450 _otel_trace_url.as_str(), _otel_log_url.as_str(),
451 &self.stats) {
452 Ok(_) => (),
453 Err(e) => {
454 error!("Failed to initialize telemetry: {}", e);
455 return None;
456 }
457 }
458 }
459 config
460 }
461
462 #[tracing::instrument(skip_all)]
464 pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
465 #[cfg(test)]
466 {
467 enable_tracing("openiap=error", "");
471 }
473 if self.is_connect_called() {
474 self.set_auto_reconnect(true);
475 return self.reconnect().await;
476 }
477 let mut strurl = dst.to_string();
478 if strurl.is_empty() {
479 strurl = std::env::var("apiurl").unwrap_or("".to_string());
480 if strurl.is_empty() {
481 strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
482 }
483 if strurl.is_empty() {
484 strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
485 }
486 }
487 if strurl.is_empty() {
488 return Err(OpenIAPError::ClientError("No URL provided".to_string()));
489 }
490 let url = url::Url::parse(strurl.as_str())
491 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
492 let mut username = "".to_string();
493 let mut password = "".to_string();
494 if let Some(p) = url.password() {
495 password = p.to_string();
496 }
497 if !url.username().is_empty() {
498 username = url.username().to_string();
499 }
500 if !username.is_empty() && !password.is_empty() {
501 self.set_username(&username);
502 self.set_password(&password);
503 }
504 let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
505 if url.scheme() != "http"
506 && url.scheme() != "https"
507 && url.scheme() != "grpc"
508 && url.scheme() != "ws"
509 && url.scheme() != "wss"
510 {
511 return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
512 }
513 if url.scheme() == "grpc" {
514 if url.port() == Some(443) {
515 strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
516 } else {
517 strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
518 }
519 }
520 let url = url::Url::parse(strurl.as_str())
521 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
522 if url.port().is_none() {
523 strurl = format!(
524 "{}://{}",
525 url.scheme(),
526 url.host_str().unwrap_or("app.openiap.io")
527 );
528 } else {
529 strurl = format!(
530 "{}://{}:{}",
531 url.scheme(),
532 url.host_str().unwrap_or("localhost.openiap.io"),
533 url.port().unwrap_or(80)
534 );
535 }
536 debug!("Connecting to {}", strurl);
537 let config = self.load_config(strurl.as_str(), &url).await;
538 if !usegprc {
539 strurl = format!("{}/ws/v2", strurl);
540
541 let (_stream_tx, stream_rx) = mpsc::channel(60);
542
543 let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
544 Ok((socket, _)) => socket,
545 Err(e) => {
546 return Err(OpenIAPError::ClientError(format!(
547 "Failed to connect to WS: {}",
548 e
549 )));
550 }
551 };
552 self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
553 self.set_connect_called(true);
554 self.set_config(config);
555 self.set_url(&strurl);
556 match self.setup_ws(&strurl).await {
557 Ok(_) => (),
558 Err(e) => {
559 return Err(OpenIAPError::ClientError(format!(
560 "Failed to setup WS: {}",
561 e
562 )));
563 }
564 }
565 let client2 = self.clone();
566 tokio::task::spawn(async move {
568 tokio_stream::wrappers::ReceiverStream::new(stream_rx)
569 .for_each(|envelope: Envelope| async {
570 let command = envelope.command.clone();
571 let rid = envelope.rid.clone();
572 let id = envelope.id.clone();
573 trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
574 client2.parse_incomming_envelope(envelope).await;
575 })
576 .await;
577 }); } else {
579 if url.scheme() == "http" {
580 let response = Client::connect_grpc(strurl.clone()).await;
581 match response {
582 Ok(client) => {
583 self.set_client(ClientEnum::Grpc(client));
584 }
585 Err(e) => {
586 return Err(OpenIAPError::ClientError(format!(
587 "Failed to connect: {}",
588 e
589 )));
590 }
591 }
592 } else {
593 let uri = tonic::transport::Uri::builder()
594 .scheme(url.scheme())
595 .authority(url.host().unwrap().to_string())
596 .path_and_query("/")
597 .build();
598 let uri = match uri {
599 Ok(uri) => uri,
600 Err(e) => {
601 return Err(OpenIAPError::ClientError(format!(
602 "Failed to build URI: {}",
603 e
604 )));
605 }
606 };
607 let channel = Channel::builder(uri)
608 .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
609 let channel = match channel {
610 Ok(channel) => channel,
611 Err(e) => {
612 return Err(OpenIAPError::ClientError(format!(
613 "Failed to build channel: {}",
614 e
615 )));
616 }
617 };
618 let channel = channel.connect().await;
619 let channel = match channel {
620 Ok(channel) => channel,
621 Err(e) => {
622 return Err(OpenIAPError::ClientError(format!(
623 "Failed to connect: {}",
624 e
625 )));
626 }
627 };
628 self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
629 }
630 self.set_connect_called(true);
631 self.set_config(config);
632 self.set_url(&strurl);
633 self.setup_grpc_stream().await?;
634 };
635 self.post_connected().await
636 }
637
638 #[tracing::instrument(skip_all)]
667 pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
668 #[cfg(test)]
669 {
670 enable_tracing("openiap=error", "");
671 }
672 let client = Client::new();
673 client.connect_async(dst).await?;
674 Ok(client)
675 }
676 pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
678 if self.get_username().is_empty() && self.get_password().is_empty() {
679 self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
680 self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
681 }
682 if !self.get_username().is_empty() && !self.get_password().is_empty() {
683 debug!("Signing in with username: {}", self.get_username());
684 let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
685 let loginresponse = self.signin(signin).await;
686 match loginresponse {
687 Ok(response) => {
688 self.reset_reconnect_ms();
689 self.set_connected(ClientState::Connected, None);
690 info!("Signed in as {}", response.user.as_ref().unwrap().username);
691 Ok(())
692 }
693 Err(_e) => {
694 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
695 Err(_e)
696 }
697 }
698 } else {
699 self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
700 if self.get_jwt().is_empty() {
701 self.set_jwt(&std::env::var("jwt").unwrap_or_default());
702 }
703 if !self.get_jwt().is_empty() {
704 debug!("Signing in with JWT");
705 let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
706 let loginresponse = self.signin(signin).await;
707 match loginresponse {
708 Ok(response) => match response.user {
709 Some(user) => {
710 self.reset_reconnect_ms();
711 info!("Signed in as {}", user.username);
712 self.set_connected(ClientState::Connected, None);
713 Ok(())
714 }
715 None => {
716 self.reset_reconnect_ms();
717 info!("Signed in as guest");
718 self.set_connected(ClientState::Connected, None);
719 Ok(())
720 }
722 },
723 Err(_e) => {
724 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
725 Err(_e)
726 }
727 }
728 } else {
729 self.reset_reconnect_ms();
730 match self.get_element().await {
731 Ok(_) => {
732 debug!("Connected, No credentials provided so is running as guest");
733 self.set_connected(ClientState::Connected, None);
734 Ok(())
735 },
736 Err(e) => {
737 self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
738 Err(e)
739 }
740 }
741 }
742 }
743 }
744 #[tracing::instrument(skip_all)]
746 pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
747 let state = self.get_state();
748 if state == ClientState::Connected || state == ClientState::Signedin {
749 return Ok(());
750 }
751 if !self.is_auto_reconnect() {
752 return Ok(());
753 }
754 let client = self.get_client();
755
756 match client {
757 ClientEnum::WS(ref _client) => {
758 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
759 self.setup_ws(&self.get_url()).await?;
760 debug!("Completed reconnecting to websocket");
761 self.post_connected().await
762 }
763 ClientEnum::Grpc(ref _client) => {
764 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
765 match self.setup_grpc_stream().await {
766 Ok(_) => {
767 debug!("Completed reconnecting to gRPC");
768 self.post_connected().await
769 },
770 Err(e) => {
771 return Err(OpenIAPError::ClientError(format!(
772 "Failed to setup gRPC stream: {}",
773 e
774 )));
775 }
776 }
777 }
778 ClientEnum::None => {
779 return Err(OpenIAPError::ClientError("Invalid client".to_string()));
780 }
781 }
782 }
783 pub fn disconnect(&self) {
785 self.set_auto_reconnect(false);
786 self.set_connected(ClientState::Disconnected, Some("Disconnected"));
787 }
788 pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
790 {
791 let current = self.get_state();
792 trace!("Set connected: {:?} from {:?}", state, current);
793 if state == ClientState::Connected && current == ClientState::Signedin {
794 self.set_state(ClientState::Signedin);
795 } else {
796 self.set_state(state.clone());
797 }
798 if state == ClientState::Disconnected && !current.eq(&state) {
799 let me = self.clone();
800 tokio::task::spawn(async move {
801 let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
802 let mut callback_guard = me.rpc_callback.lock().await;
803 *reply_queue_guard = None;
804 *callback_guard = None;
805 });
806 }
807 if state == ClientState::Connecting && !current.eq(&state) {
808 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
809 self.stats.lock().unwrap().connection_attempts += 1;
810 let me = self.clone();
811 tokio::task::spawn(async move {
812 me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
813 });
814 }
823
824 }
825 if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) {
826 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
827 self.stats.lock().unwrap().connections += 1;
828 let me = self.clone();
829 tokio::task::spawn(async move {
830 me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
831 });
832 }
841 }
842 if state == ClientState::Signedin && current != ClientState::Signedin {
843 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
844 let me = self.clone();
845 tokio::task::spawn(async move {
846 me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
847 });
848 }
857 }
858 if state == ClientState::Disconnected && !current.eq(&state) {
859 if message.is_some() {
860 debug!("Disconnected: {}", message.unwrap());
861 } else {
862 debug!("Disconnected");
863 }
864 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
865 let me = self.clone();
866 let message = match message {
867 Some(message) => message.to_string(),
868 None => "".to_string(),
869 };
870 tokio::task::spawn(async move {
872 me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
873 });
874 }
885
886 self.kill_handles();
887 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
888 let client = self.clone();
889 tokio::task::spawn(async move {
891 {
892 let inner = client.inner.lock().await;
893 let mut queries = inner.queries.lock().await;
894 let ids = queries.keys().cloned().collect::<Vec<String>>();
895 debug!("********************************************** Cleaning up");
896 for id in ids {
897 let err = ErrorResponse {
898 code: 500,
899 message: "Disconnected".to_string(),
900 stack: "".to_string(),
901 };
902 let envelope = err.to_envelope();
903 let tx = queries.remove(&id).unwrap();
904 debug!("kill query: {}", id);
905 let _ = tx.send(envelope);
906 }
907 let mut streams = inner.streams.lock().await;
908 let ids = streams.keys().cloned().collect::<Vec<String>>();
909 for id in ids {
910 let tx = streams.remove(&id).unwrap();
911 debug!("kill stream: {}", id);
912 let _ = tx.send(Vec::new()).await;
913 }
914 let mut queues = inner.queues.lock().await;
915 let ids = queues.keys().cloned().collect::<Vec<String>>();
916 for id in ids {
917 let _ = queues.remove(&id).unwrap();
918 }
919 let mut watches = inner.watches.lock().await;
920 let ids = watches.keys().cloned().collect::<Vec<String>>();
921 for id in ids {
922 let _ = watches.remove(&id).unwrap();
923 }
924 debug!("**********************************************************");
925 }
926 if client.is_auto_reconnect() {
927 trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
928 tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
929 if client.is_auto_reconnect() {
930 client.inc_reconnect_ms();
931 trace!("Reconnecting . . .");
933 client.reconnect().await.unwrap_or_else(|e| {
934 error!("Failed to reconnect: {}", e);
935 client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
936 });
937 } else {
938 debug!("Not reconnecting");
939 }
940 } else {
941 debug!("Reconnecting disabled, stop now");
942 }
943 });
944 }
951
952 }
953 }
954 }
955 pub fn get_state(&self) -> ClientState {
957 let conn = self.state.lock().unwrap();
958 conn.clone()
959 }
960 pub fn set_state(&self, state: ClientState) {
962 let mut conn = self.state.lock().unwrap();
963 *conn = state;
964 }
965 pub fn set_msgcount(&self, msgcount: i32) {
967 let mut current = self.msgcount.lock().unwrap();
968 trace!("Set msgcount: {} from {}", msgcount, *current);
969 *current = msgcount;
970 }
971 pub fn inc_msgcount(&self) -> i32 {
973 let mut current = self.msgcount.lock().unwrap();
974 *current += 1;
975 *current
976 }
977 pub fn get_reconnect_ms(&self) -> i32 {
979 let reconnect_ms = self.reconnect_ms.lock().unwrap();
980 *reconnect_ms
981 }
982 pub fn reset_reconnect_ms(&self) {
984 let mut current = self.reconnect_ms.lock().unwrap();
985 *current = 500;
986 }
987 pub fn inc_reconnect_ms(&self) -> i32 {
989 let mut current = self.reconnect_ms.lock().unwrap();
990 if *current < 30000 {
991 *current += 500;
992 }
993 *current
994 }
995
996 pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
998 let mut handles = self.task_handles.lock().unwrap();
999 handles.push(handle);
1000 }
1001 pub fn kill_handles(&self) {
1003 let mut handles = self.task_handles.lock().unwrap();
1004 for handle in handles.iter() {
1005 debug!("Killing handle");
1008 if !handle.is_finished() {
1009 handle.abort();
1010 }
1011 }
1012 handles.clear();
1013 }
1020
1021
1022 #[tracing::instrument(skip_all)]
1024 fn get_msgcount(&self) -> i32 {
1025 let msgcount = self.msgcount.lock().unwrap();
1026 *msgcount
1027 }
1028 pub fn set_default_timeout(&self, timeout: Duration) {
1030 let mut current = self.default_timeout.lock().unwrap();
1031 trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1032 *current = timeout;
1033 }
1034 pub fn get_default_timeout(&self) -> Duration {
1036 let current = self.default_timeout.lock().unwrap();
1037 current.clone()
1038 }
1039 #[tracing::instrument(skip_all)]
1041 pub fn set_connect_called(&self, connect_called: bool) {
1042 let mut current = self.connect_called.lock().unwrap();
1043 trace!("Set connect_called: {} from {}", connect_called, *current);
1044 *current = connect_called;
1045 }
1046 #[tracing::instrument(skip_all)]
1048 fn is_connect_called(&self) -> bool {
1049 let connect_called = self.connect_called.lock().unwrap();
1050 *connect_called
1051 }
1052 #[tracing::instrument(skip_all)]
1054 pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1055 let mut current = self.auto_reconnect.lock().unwrap();
1056 trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1057 *current = auto_reconnect;
1058 }
1059 #[tracing::instrument(skip_all)]
1061 fn is_auto_reconnect(&self) -> bool {
1062 let auto_reconnect = self.auto_reconnect.lock().unwrap();
1063 *auto_reconnect
1064 }
1065 #[tracing::instrument(skip_all)]
1067 pub fn set_url(&self, url: &str) {
1068 let mut current = self.url.lock().unwrap();
1069 trace!("Set url: {} from {}", url, *current);
1070 *current = url.to_string();
1071 }
1072 #[tracing::instrument(skip_all)]
1074 fn get_url(&self) -> String {
1075 let url = self.url.lock().unwrap();
1076 url.to_string()
1077 }
1078 #[tracing::instrument(skip_all)]
1080 pub fn set_username(&self, username: &str) {
1081 let mut current = self.username.lock().unwrap();
1082 trace!("Set username: {} from {}", username, *current);
1083 *current = username.to_string();
1084 }
1085 #[tracing::instrument(skip_all)]
1087 fn get_username(&self) -> String {
1088 let username = self.username.lock().unwrap();
1089 username.to_string()
1090 }
1091 #[tracing::instrument(skip_all)]
1093 pub fn set_password(&self, password: &str) {
1094 let mut current = self.password.lock().unwrap();
1095 trace!("Set password: {} from {}", password, *current);
1096 *current = password.to_string();
1097 }
1098 #[tracing::instrument(skip_all)]
1100 fn get_password(&self) -> String {
1101 let password = self.password.lock().unwrap();
1102 password.to_string()
1103 }
1104 #[tracing::instrument(skip_all)]
1106 pub fn set_jwt(&self, jwt: &str) {
1107 let mut current = self.jwt.lock().unwrap();
1108 trace!("Set jwt: {} from {}", jwt, *current);
1109 *current = jwt.to_string();
1110 }
1111 #[tracing::instrument(skip_all)]
1113 fn get_jwt(&self) -> String {
1114 let jwt = self.jwt.lock().unwrap();
1115 jwt.to_string()
1116 }
1117
1118 #[tracing::instrument(skip_all)]
1120 pub fn set_service_name(&self, service_name: &str) {
1121 let mut current = self.service_name.lock().unwrap();
1122 trace!("Set servicename: {} from {}", service_name, *current);
1123 *current = service_name.to_string();
1124 }
1125 #[tracing::instrument(skip_all)]
1127 pub fn get_service_name(&self) -> String {
1128 let servicename = self.service_name.lock().unwrap();
1129 servicename.to_string()
1130 }
1131 #[tracing::instrument(skip_all)]
1133 pub fn set_agent_name(&self, agent: &str) {
1134 let mut current = self.agent_name.lock().unwrap();
1135 trace!("Set agent: {} from {}", agent, *current);
1136 *current = agent.to_string();
1137 }
1138 #[tracing::instrument(skip_all)]
1140 pub fn get_agent_name(&self) -> String {
1141 let agent = self.agent_name.lock().unwrap();
1142 agent.to_string()
1143 }
1144 #[tracing::instrument(skip_all)]
1146 pub fn set_agent_version(&self, version: &str) {
1147 let mut current = self.agent_version.lock().unwrap();
1148 trace!("Set agent version: {} from {}", version, *current);
1149 *current = version.to_string();
1150 }
1151 #[tracing::instrument(skip_all)]
1153 pub fn get_agent_version(&self) -> String {
1154 let agent_version = self.agent_version.lock().unwrap();
1155 agent_version.to_string()
1156 }
1157
1158 #[tracing::instrument(skip_all)]
1160 pub fn set_config(&self, config: Option<Config>) {
1161 let mut current = self.config.lock().unwrap();
1162 *current = config;
1163 }
1164 #[tracing::instrument(skip_all)]
1166 pub fn get_config(&self) -> Option<Config> {
1167 let config = self.config.lock().unwrap();
1168 config.clone()
1169 }
1170 #[tracing::instrument(skip_all)]
1172 pub fn set_client(&self, client: ClientEnum) {
1173 let mut current = self.client.lock().unwrap();
1174 *current = client;
1175 }
1176 #[tracing::instrument(skip_all)]
1178 fn get_client(&self) -> ClientEnum {
1179 let client = self.client.lock().unwrap();
1180 client.clone()
1181 }
1182 #[tracing::instrument(skip_all)]
1184 pub fn set_user(&self, user: Option<User>) {
1185 let mut current = self.user.lock().unwrap();
1186 *current = user;
1187 }
1188 #[tracing::instrument(skip_all)]
1190 pub fn get_user(&self) -> Option<User> {
1191 let user = self.user.lock().unwrap();
1192 user.clone()
1193 }
1194 #[tracing::instrument(skip_all)]
1204 pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1205 let mut current = self.runtime.lock().unwrap();
1206 *current = runtime;
1207 }
1208 #[tracing::instrument(skip_all)]
1210 pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1212 self.runtime.as_ref()
1213 }
1214 #[tracing::instrument(skip_all)]
1216 pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1217 let mut rt = self.runtime.lock().unwrap();
1218 if rt.is_none() {
1219 let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1221 *rt = Some(runtime);
1222 } else {
1223 }
1225 rt.as_ref().unwrap().handle().clone()
1226 }
1227 #[tracing::instrument(skip_all)]
1229 pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1230 {
1231 let event_receiver = self.event_receiver.clone();
1233 let callback = callback;
1234 let _handle = tokio::task::spawn(async move {
1235 while let Ok(event) = event_receiver.recv().await {
1236 callback(event);
1237 }
1238 }); }
1240 #[tracing::instrument(skip_all)]
1242 pub fn get_uniqueid() -> String {
1243 static COUNTER: AtomicUsize = AtomicUsize::new(1);
1244 let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1245 let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1246 let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1247 let sqids = Sqids::default();
1248 sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1249 }
1250 #[tracing::instrument(skip_all)]
1252 async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1253 let response = self.send_noawait(msg).await;
1254 match response {
1255 Ok((response_rx, id)) => {
1256 let timeout = match timeout {
1257 Some(t) => t,
1258 None => self.get_default_timeout()
1259 };
1260 let result = tokio::time::timeout(timeout, response_rx).await;
1261 let inner = self.inner.lock().await;
1263 inner.queries.lock().await.remove(&id);
1264
1265 match result {
1266 Ok(Ok(response)) => Ok(response),
1267 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1268 Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1269 }
1270 }
1283 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1284 }
1285 }
1286 #[tracing::instrument(skip_all)]
1289 async fn send_noawait(
1290 &self,
1291 mut msg: Envelope,
1292 ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1293 let (response_tx, response_rx) = oneshot::channel();
1294 let id = Client::get_uniqueid();
1295 msg.id = id.clone();
1296
1297 {
1299 let inner = self.inner.lock().await;
1300 inner.queries.lock().await.insert(id.clone(), response_tx);
1301 }
1302
1303 let res = self.send_envelope(msg).await;
1305 if let Err(e) = res {
1306 let inner = self.inner.lock().await;
1308 inner.queries.lock().await.remove(&id);
1309 return Err(OpenIAPError::ClientError(e.to_string()));
1310 }
1311
1312 Ok((response_rx, id))
1313 }
1314 #[tracing::instrument(skip_all)]
1316 async fn sendwithstream(
1317 &self,
1318 mut msg: Envelope,
1319 ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1320 let (response_tx, response_rx) = oneshot::channel();
1321 let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1322 let id = Client::get_uniqueid();
1323 msg.id = id.clone();
1324 {
1325 let inner = self.inner.lock().await;
1326 inner.queries.lock().await.insert(id.clone(), response_tx);
1327 inner.streams.lock().await.insert(id.clone(), stream_tx);
1328 let res = self.send_envelope(msg).await;
1329 match res {
1330 Ok(_) => (),
1331 Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1332 }
1333 }
1334 Ok((response_rx, stream_rx))
1335 }
1336 #[tracing::instrument(skip_all, target = "openiap::client")]
1337 async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1338 if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin )
1339 && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1340 return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1341 }
1342 let env = envelope.clone();
1343 let command = envelope.command.clone();
1344 self.stats.lock().unwrap().package_tx += 1;
1345 match command.as_str() {
1346 "signin" => { self.stats.lock().unwrap().signin += 1;},
1347 "upload" => { self.stats.lock().unwrap().upload += 1;},
1348 "download" => { self.stats.lock().unwrap().download += 1;},
1349 "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1350 "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1351 "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1352 "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1353 "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1354 "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1355 "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1356
1357 "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1358 "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1359 "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1360 "watch" => { self.stats.lock().unwrap().watch += 1;},
1361 "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1362 "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1363
1364 "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1365 "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1366 "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1367 "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1368 "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1369 "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1370 "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1371 "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1372
1373 "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1374 "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1375 "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1376 "query" => { self.stats.lock().unwrap().query += 1;},
1377 "count" => { self.stats.lock().unwrap().count += 1;},
1378 "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1379 "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1380 "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1381 "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1382 "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1383 "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1384 "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1385 "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1386 "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1387 "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1388 _ => {}
1389 };
1390 if envelope.id.is_empty() {
1391 let id = Client::get_uniqueid();
1392 envelope.id = id.clone();
1393 }
1394 trace!("Sending {} message, in the thread", command);
1395 let res = self.out_envelope_sender.send(env).await;
1396 if res.is_err() {
1397 error!("{:?}", res);
1398 let errmsg = res.unwrap_err().to_string();
1399 self.set_connected(ClientState::Disconnected, Some(&errmsg));
1400 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1401 } else {
1402 return Ok(())
1403 }
1404 }
1405 #[tracing::instrument(skip_all, target = "openiap::client")]
1406 async fn parse_incomming_envelope(&self, received: Envelope) {
1407 self.stats.lock().unwrap().package_rx += 1;
1408 let command = received.command.clone();
1409 trace!("parse_incomming_envelope, command: {}", command);
1410 let inner = self.inner.lock().await;
1411 let rid = received.rid.clone();
1412 let mut queries = inner.queries.lock().await;
1413 let mut streams = inner.streams.lock().await;
1414 let watches = inner.watches.lock().await;
1415 let queues = inner.queues.lock().await;
1416
1417 if command != "ping" && command != "pong" && command != "refreshtoken" {
1418 if rid.is_empty() {
1419 debug!("Received #{} #{} {} message", received.seq, received.id, command);
1420 } else {
1421 debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1422 }
1423 } else if rid.is_empty() {
1424 trace!("Received #{} #{} {} message", received.seq, received.id, command);
1425 } else {
1426 trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1427 }
1428
1429 if command == "ping" {
1430 self.pong(&received.id).await;
1431 } else if command == "refreshtoken" {
1433 } else if command == "beginstream"
1435 || command == "stream"
1436 || command == "endstream"
1437 {
1438 let streamresponse: Stream =
1439 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1440 let streamdata = streamresponse.data;
1441 if !streamdata.is_empty() {
1442 let stream = streams.get(rid.as_str()).unwrap();
1443
1444 match stream.send(streamdata).await {
1445 Ok(_) => _ = (),
1446 Err(e) => error!("Failed to send data: {}", e),
1447 }
1448 }
1449 if command == "endstream" {
1450 let _ = streams.remove(rid.as_str());
1451 }
1452 } else if command == "watchevent" {
1453 let watchevent: WatchEvent =
1454 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1455 if let Some(callback) = watches.get(watchevent.id.as_str()) {
1456 callback(watchevent);
1457 }
1458 } else if command == "queueevent" {
1459 let queueevent: QueueEvent =
1460 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1461 if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1462 let queuename = queueevent.replyto.clone();
1463 let correlation_id = queueevent.correlation_id.clone();
1464 let me = self.clone();
1465 tokio::spawn(async move {
1466 let result_fut = callback(Arc::new(me.clone()), queueevent);
1467 let result = result_fut.await;
1468 if result.is_some() && !queuename.is_empty() {
1469 debug!("Sending return value from queue event callback to {}", queuename);
1470 let result = result.unwrap();
1471 let q = QueueMessageRequest {
1472 queuename,
1473 correlation_id,
1474 data: result,
1475 striptoken: true,
1476 ..Default::default()
1477 };
1478 let e = q.to_envelope();
1479 let send_result = me.send(e, None).await;
1480 if let Err(e) = send_result {
1481 error!("Failed to send queue event response: {}", e);
1482 }
1483 }
1484 });
1485 }
1486 } else if let Some(response_tx) = queries.remove(&rid) {
1487 let stream = streams.get(rid.as_str());
1488 if let Some(stream) = stream {
1489 let streamdata = vec![];
1490 match stream.send(streamdata).await {
1491 Ok(_) => _ = (),
1492 Err(e) => error!("Failed to send data: {}", e),
1493 }
1494 }
1495 let _ = response_tx.send(received);
1496 } else {
1497 error!("Received unhandled {} message: {:?}", command, received);
1498 }
1499 }
1500 #[tracing::instrument(skip_all)]
1502 async fn get_element(&self) -> Result<(), OpenIAPError> {
1503 let id = Client::get_uniqueid();
1504 let envelope = Envelope {
1505 id: id.clone(),
1506 command: "getelement".into(),
1507 ..Default::default()
1508 };
1509 let result = match self.send(envelope, None).await {
1510 Ok(res) => res,
1511 Err(e) => {
1512 return Err(e);
1513 },
1514 };
1515 if result.command == "pong" || result.command == "getelement" {
1516 Ok(())
1517 } else if result.command == "error" {
1518 let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1519 Err(OpenIAPError::ServerError(e.message))
1520 } else {
1521 Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1522 }
1523 }
1524 #[tracing::instrument(skip_all)]
1526 async fn ping(&self) -> Result<(), OpenIAPError> {
1527 let id = Client::get_uniqueid();
1528 let envelope = Envelope {
1529 id: id.clone(),
1530 command: "getelement".into(),
1531 ..Default::default()
1532 };
1533 match self.send_envelope(envelope).await {
1534 Ok(_res) => Ok(()),
1535 Err(e) => {
1536 return Err(e);
1537 },
1538 }
1539 }
1540 #[tracing::instrument(skip_all)]
1542 async fn pong(&self, rid: &str) {
1543 let id = Client::get_uniqueid();
1544 let envelope = Envelope {
1545 id: id.clone(),
1546 command: "pong".into(),
1547 rid: rid.to_string(),
1548 ..Default::default()
1549 };
1550 match self.send_envelope(envelope).await {
1551 Ok(_) => (),
1552 Err(e) => error!("Failed to send pong: {}", e),
1553 }
1554 }
1555 #[tracing::instrument(skip_all)]
1561 pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1562 if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1564 if config.jwt.is_empty() {
1565 config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1566 }
1567 if config.jwt.is_empty() {
1568 config.jwt = std::env::var("jwt").unwrap_or_default();
1569 }
1570 if config.jwt.is_empty() {
1572 if config.username.is_empty() {
1573 config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1574 }
1575 if config.password.is_empty() {
1576 config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1577 }
1578 }
1579 }
1580 let version = env!("CARGO_PKG_VERSION");
1581 if !version.is_empty() && config.version.is_empty() {
1582 config.version = version.to_string();
1583 }
1584 if config.agent.is_empty() {
1585 config.agent = self.get_agent_name();
1586 }
1587
1588 let envelope = config.to_envelope();
1590 let result = self.send(envelope, None).await;
1591
1592 match &result {
1593 Ok(m) => {
1594 debug!("Sign-in reply received");
1595 if m.command == "error" {
1596 let e: ErrorResponse =
1597 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1598 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1599 return Err(OpenIAPError::ServerError(e.message));
1600 }
1601 debug!("Sign-in successful");
1602 let response: SigninResponse =
1603 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1604 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1605 if !config.validateonly {
1606 self.set_connected(ClientState::Signedin, None);
1607 self.set_user(Some(response.user.as_ref().unwrap().clone()));
1608 }
1609 Ok(response)
1610 }
1611 Err(e) => {
1612 debug!("Sending Sign-in request failed {:?}", result);
1613 debug!("Sign-in failed: {}", e.to_string());
1614 if !config.validateonly {
1615 self.set_user(None);
1616 }
1617 Err(OpenIAPError::ClientError(e.to_string()))
1618 }
1619 }
1620 }
1621 #[tracing::instrument(skip_all)]
1625 pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1626 let config = ListCollectionsRequest::new(includehist);
1627 let envelope = config.to_envelope();
1628 let result = self.send(envelope, None).await;
1629 match result {
1630 Ok(m) => {
1631 let data = match m.data {
1632 Some(data) => data,
1633 None => {
1634 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1635 }
1636 };
1637 if m.command == "error" {
1638 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1639 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1640 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1641 }
1642 let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1643 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1644 Ok(response.results)
1645 }
1646 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1647 }
1648 }
1649 #[tracing::instrument(skip_all)]
1701 pub async fn create_collection(
1702 &self,
1703 config: CreateCollectionRequest,
1704 ) -> Result<(), OpenIAPError> {
1705 if config.collectionname.is_empty() {
1706 return Err(OpenIAPError::ClientError(
1707 "No collection name provided".to_string(),
1708 ));
1709 }
1710 let envelope = config.to_envelope();
1711 let result = self.send(envelope, None).await;
1712 match result {
1713 Ok(m) => {
1714 let data = match m.data {
1715 Some(data) => data,
1716 None => {
1717 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1718 }
1719 };
1720 if m.command == "error" {
1721 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1722 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1723 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1724 }
1725 Ok(())
1726 }
1727 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1728 }
1729 }
1730 #[tracing::instrument(skip_all)]
1733 pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1734 if config.collectionname.is_empty() {
1735 return Err(OpenIAPError::ClientError(
1736 "No collection name provided".to_string(),
1737 ));
1738 }
1739 let envelope = config.to_envelope();
1740 let result = self.send(envelope, None).await;
1741 match result {
1742 Ok(m) => {
1743 let data = match m.data {
1744 Some(data) => data,
1745 None => {
1746 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1747 }
1748 };
1749 if m.command == "error" {
1750 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1751 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1752 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1753 }
1754 Ok(())
1755 }
1756 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1757 }
1758 }
1759 pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1773 if config.collectionname.is_empty() {
1774 return Err(OpenIAPError::ClientError(
1775 "No collection name provided".to_string(),
1776 ));
1777 }
1778 let envelope = config.to_envelope();
1779 let result = self.send(envelope, None).await;
1780 match result {
1781 Ok(m) => {
1782 let data = match m.data {
1783 Some(data) => data,
1784 None => {
1785 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1786 }
1787 };
1788 if m.command == "error" {
1789 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1790 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1791 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1792 }
1793 let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1794 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1795 Ok(response.results)
1796 }
1797 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1798 }
1799 }
1800 pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1841 if config.collectionname.is_empty() {
1842 return Err(OpenIAPError::ClientError(
1843 "No collection name provided".to_string(),
1844 ));
1845 }
1846 if config.index.is_empty() {
1847 return Err(OpenIAPError::ClientError(
1848 "No index was provided".to_string(),
1849 ));
1850 }
1851 let envelope = config.to_envelope();
1852 let result = self.send(envelope, None).await;
1853 match result {
1854 Ok(m) => {
1855 let data = match m.data {
1856 Some(data) => data,
1857 None => {
1858 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1859 }
1860 };
1861 if m.command == "error" {
1862 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1863 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1864 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1865 }
1866 Ok(())
1867 }
1868 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1869 }
1870 }
1871 pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1874 if config.collectionname.is_empty() {
1875 return Err(OpenIAPError::ClientError(
1876 "No collection name provided".to_string(),
1877 ));
1878 }
1879 if config.name.is_empty() {
1880 return Err(OpenIAPError::ClientError(
1881 "No index name provided".to_string(),
1882 ));
1883 }
1884 let envelope = config.to_envelope();
1885 let result = self.send(envelope, None).await;
1886 match result {
1887 Ok(m) => {
1888 let data = match m.data {
1889 Some(data) => data,
1890 None => {
1891 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1892 }
1893 };
1894 if m.command == "error" {
1895 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1896 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1897 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1898 }
1899 Ok(())
1900 }
1901 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1902 }
1903 }
1904 #[tracing::instrument(skip_all)]
1942 pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1943 if config.collectionname.is_empty() {
1944 config.collectionname = "entities".to_string();
1945 }
1946
1947 let envelope = config.to_envelope();
1948 let result = self.send(envelope, None).await;
1949 match result {
1950 Ok(m) => {
1951 let data = match m.data {
1952 Some(data) => data,
1953 None => {
1954 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1955 }
1956 };
1957 if m.command == "error" {
1958 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1959 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1960 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1961 }
1962 let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1963 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1964 debug!("Return Ok(response)");
1965 Ok(response)
1966 }
1967 Err(e) => {
1968 debug!("Error !!");
1969 Err(OpenIAPError::ClientError(e.to_string()))
1970 }
1971 }
1972 }
1973 #[tracing::instrument(skip_all)]
1999 pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
2000 if config.collectionname.is_empty() {
2001 config.collectionname = "entities".to_string();
2002 }
2003 config.top = 1;
2004 let envelope = config.to_envelope();
2005 let result = self.send(envelope, None).await;
2006 match result {
2007 Ok(m) => {
2008 let data = match m.data {
2009 Some(data) => data,
2010 None => return None,
2011 };
2012 if m.command == "error" {
2013 return None;
2014 }
2015 let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
2016
2017 let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
2018 let items: &Vec<serde_json::Value> = items.as_array().unwrap();
2019 if items.is_empty() {
2020 return None;
2021 }
2022 let item = items[0].clone();
2023 Some(item)
2024 }
2025 Err(_) => None,
2026 }
2027 }
2028
2029 #[tracing::instrument(skip_all)]
2139 pub async fn get_document_version(
2140 &self,
2141 mut config: GetDocumentVersionRequest,
2142 ) -> Result<String, OpenIAPError> {
2143 if config.collectionname.is_empty() {
2144 config.collectionname = "entities".to_string();
2145 }
2146 if config.id.is_empty() {
2147 return Err(OpenIAPError::ClientError("No id provided".to_string()));
2148 }
2149 let envelope = config.to_envelope();
2150 let result = self.send(envelope, None).await;
2151 match result {
2152 Ok(m) => {
2153 let data = match m.data {
2154 Some(data) => data,
2155 None => {
2156 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2157 }
2158 };
2159 if m.command == "error" {
2160 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2161 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2162 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2163 }
2164 let response: GetDocumentVersionResponse =
2165 prost::Message::decode(data.value.as_ref())
2166 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2167 Ok(response.result)
2168 }
2169 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2170 }
2171 }
2172 #[tracing::instrument(skip_all)]
2191 pub async fn aggregate(
2192 &self,
2193 mut config: AggregateRequest,
2194 ) -> Result<AggregateResponse, OpenIAPError> {
2195 if config.collectionname.is_empty() {
2196 config.collectionname = "entities".to_string();
2197 }
2198 if config.hint.is_empty() {
2199 config.hint = "".to_string();
2200 }
2201 if config.queryas.is_empty() {
2202 config.queryas = "".to_string();
2203 }
2204 if config.aggregates.is_empty() {
2205 return Err(OpenIAPError::ClientError(
2206 "No aggregates provided".to_string(),
2207 ));
2208 }
2209 let envelope = config.to_envelope();
2210 let result = self.send(envelope, None).await;
2211 match result {
2212 Ok(m) => {
2213 let data = match m.data {
2214 Some(data) => data,
2215 None => {
2216 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2217 }
2218 };
2219 if m.command == "error" {
2220 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2221 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2222 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2223 }
2224 let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2225 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2226 Ok(response)
2227 }
2228 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2229 }
2230 }
2231 #[tracing::instrument(skip_all)]
2233 pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2234 if config.collectionname.is_empty() {
2235 config.collectionname = "entities".to_string();
2236 }
2237 if config.query.is_empty() {
2238 config.query = "{}".to_string();
2239 }
2240 let envelope = config.to_envelope();
2241 let result = self.send(envelope, None).await;
2242 match result {
2243 Ok(m) => {
2244 let data = match m.data {
2245 Some(data) => data,
2246 None => {
2247 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2248 }
2249 };
2250 if m.command == "error" {
2251 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2252 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2253 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2254 }
2255 let response: CountResponse = prost::Message::decode(data.value.as_ref())
2256 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2257 Ok(response)
2258 }
2259 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2260 }
2261 }
2262 #[tracing::instrument(skip_all)]
2264 pub async fn distinct(
2265 &self,
2266 mut config: DistinctRequest,
2267 ) -> Result<DistinctResponse, OpenIAPError> {
2268 if config.collectionname.is_empty() {
2269 config.collectionname = "entities".to_string();
2270 }
2271 if config.query.is_empty() {
2272 config.query = "{}".to_string();
2273 }
2274 if config.field.is_empty() {
2275 return Err(OpenIAPError::ClientError("No field provided".to_string()));
2276 }
2277 let envelope = config.to_envelope();
2278 let result = self.send(envelope, None).await;
2279 match result {
2280 Ok(m) => {
2281 let data = match m.data {
2282 Some(data) => data,
2283 None => {
2284 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2285 }
2286 };
2287 if m.command == "error" {
2288 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2289 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2290 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2291 }
2292 let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2293 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2294 Ok(response)
2295 }
2296 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2297 }
2298 }
2299 #[tracing::instrument(skip_all)]
2301 pub async fn insert_one(
2302 &self,
2303 config: InsertOneRequest,
2304 ) -> Result<InsertOneResponse, OpenIAPError> {
2305 let envelope = config.to_envelope();
2306 let result = self.send(envelope, None).await;
2307 match result {
2308 Ok(m) => {
2309 let data = match m.data {
2310 Some(data) => data,
2311 None => {
2312 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2313 }
2314 };
2315 if m.command == "error" {
2316 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2317 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2318 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2319 }
2320 let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2321 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2322 Ok(response)
2323 }
2324 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2325 }
2326 }
2327 #[tracing::instrument(skip_all)]
2329 pub async fn insert_many(
2330 &self,
2331 config: InsertManyRequest,
2332 ) -> Result<InsertManyResponse, OpenIAPError> {
2333 let envelope = config.to_envelope();
2334 let result = self.send(envelope, None).await;
2335 match result {
2336 Ok(m) => {
2337 let data = match m.data {
2338 Some(data) => data,
2339 None => {
2340 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2341 }
2342 };
2343 if m.command == "error" {
2344 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2345 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2346 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2347 }
2348 let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2349 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2350 Ok(response)
2351 }
2352 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2353 }
2354 }
2355 #[tracing::instrument(skip_all)]
2357 pub async fn update_one(
2358 &self,
2359 config: UpdateOneRequest,
2360 ) -> Result<UpdateOneResponse, OpenIAPError> {
2361 let envelope = config.to_envelope();
2362 let result = self.send(envelope, None).await;
2363 match result {
2364 Ok(m) => {
2365 let data = match m.data {
2366 Some(data) => data,
2367 None => {
2368 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2369 }
2370 };
2371 if m.command == "error" {
2372 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2373 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2374 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2375 }
2376 let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2377 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2378 Ok(response)
2379 }
2380 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2381 }
2382 }
2383 #[tracing::instrument(skip_all)]
2385 pub async fn insert_or_update_one(
2386 &self,
2387 config: InsertOrUpdateOneRequest,
2388 ) -> Result<String, OpenIAPError> {
2389 let envelope = config.to_envelope();
2390 let result = self.send(envelope, None).await;
2391 match result {
2392 Ok(m) => {
2393 let data = match m.data {
2394 Some(data) => data,
2395 None => {
2396 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2397 }
2398 };
2399 if m.command == "error" {
2400 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2401 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2402 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2403 }
2404 let response: InsertOrUpdateOneResponse =
2405 prost::Message::decode(data.value.as_ref())
2406 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2407 Ok(response.result)
2408 }
2409 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2410 }
2411 }
2412 #[tracing::instrument(skip_all)]
2414 pub async fn insert_or_update_many(
2415 &self,
2416 config: InsertOrUpdateManyRequest,
2417 ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2418 let envelope = config.to_envelope();
2419 let result = self.send(envelope, None).await;
2420 match result {
2421 Ok(m) => {
2422 let data = match m.data {
2423 Some(data) => data,
2424 None => {
2425 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2426 }
2427 };
2428 if m.command == "error" {
2429 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2430 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2431 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2432 }
2433 let response: InsertOrUpdateManyResponse =
2434 prost::Message::decode(data.value.as_ref())
2435 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2436 Ok(response)
2437 }
2438 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2439 }
2440 }
2441 #[tracing::instrument(skip_all)]
2443 pub async fn update_document(
2444 &self,
2445 config: UpdateDocumentRequest,
2446 ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2447 let envelope = config.to_envelope();
2448 let result = self.send(envelope, None).await;
2449 match result {
2450 Ok(m) => {
2451 let data = match m.data {
2452 Some(data) => data,
2453 None => {
2454 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2455 }
2456 };
2457 if m.command == "error" {
2458 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2459 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2460 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2461 }
2462 let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2463 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2464 Ok(response)
2465 }
2466 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2467 }
2468 }
2469 #[tracing::instrument(skip_all)]
2471 pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2472 let envelope = config.to_envelope();
2473 let result = self.send(envelope, None).await;
2474 match result {
2475 Ok(m) => {
2476 let data = match m.data {
2477 Some(data) => data,
2478 None => {
2479 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2480 }
2481 };
2482 if m.command == "error" {
2483 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2484 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2485 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2486 }
2487 let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2488 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2489 Ok(response.affectedrows)
2490 }
2491 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2492 }
2493 }
2494 #[tracing::instrument(skip_all)]
2496 pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2497 let envelope = config.to_envelope();
2498 let result = self.send(envelope, None).await;
2499 match result {
2500 Ok(m) => {
2501 let data = match m.data {
2502 Some(data) => data,
2503 None => {
2504 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2505 }
2506 };
2507 if m.command == "error" {
2508 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2509 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2510 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2511 }
2512 let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2513 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2514 Ok(response.affectedrows)
2515 }
2516 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2517 }
2518 }
2519 #[tracing::instrument(skip_all)]
2521 pub async fn download(
2522 &self,
2523 config: DownloadRequest,
2524 folder: Option<&str>,
2525 filename: Option<&str>,
2526 ) -> Result<DownloadResponse, OpenIAPError> {
2527 let envelope = config.to_envelope();
2528 match self.sendwithstream(envelope).await {
2529 Ok((response_rx, mut stream_rx)) => {
2530 let temp_file_path = util::generate_unique_filename("openiap");
2531 debug!("Temp file: {:?}", temp_file_path);
2532 let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2533 OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2534 })?;
2535 while !stream_rx.is_closed() {
2536 match stream_rx.recv().await {
2537 Some(received) => {
2538 if received.is_empty() {
2539 debug!("Stream closed");
2540 break;
2541 }
2542 debug!("Received {} bytes", received.len());
2543 temp_file.write_all(&received).map_err(|e| {
2544 OpenIAPError::ClientError(format!(
2545 "Failed to write to temp file: {}",
2546 e
2547 ))
2548 })?;
2549 }
2550 None => {
2551 debug!("Stream closed");
2552 break;
2553 }
2554 }
2555 }
2556 temp_file.sync_all().map_err(|e| {
2557 OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2558 })?;
2559
2560 let response = response_rx.await.map_err(|_| {
2561 OpenIAPError::ClientError("Failed to receive response".to_string())
2562 })?;
2563
2564 if response.command == "error" {
2565 let data = match response.data {
2566 Some(data) => data,
2567 None => {
2568 return Err(OpenIAPError::ClientError(
2569 "No data returned for SERVER error".to_string(),
2570 ));
2571 }
2572 };
2573 let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2574 return Err(OpenIAPError::ServerError(e.message));
2575 }
2576 let mut downloadresponse: DownloadResponse =
2577 prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2578
2579 let mut final_filename = match &filename {
2580 Some(f) => f,
2581 None => downloadresponse.filename.as_str(),
2582 };
2583 if final_filename.is_empty() {
2584 final_filename = downloadresponse.filename.as_str();
2585 }
2586 let mut folder = match &folder {
2587 Some(f) => f,
2588 None => ".",
2589 };
2590 if folder.is_empty() {
2591 folder = ".";
2592 }
2593 let filepath = format!("{}/{}", folder, final_filename);
2594 trace!("Moving file to {}", filepath);
2595 util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2596 OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2597 })?;
2598 debug!("Downloaded file to {}", filepath);
2599 downloadresponse.filename = filepath;
2600
2601 Ok(downloadresponse)
2602 }
2603 Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2604 }
2605 }
2606 #[tracing::instrument(skip_all)]
2608 pub async fn upload(
2609 &self,
2610 config: UploadRequest,
2611 filepath: &str,
2612 ) -> Result<UploadResponse, OpenIAPError> {
2613 debug!("upload: Uploading file: {}", filepath);
2672 let mut file = File::open(filepath)
2673 .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2674 let chunk_size = 1024 * 1024;
2675 let mut buffer = vec![0; chunk_size];
2676
2677 let envelope = config.to_envelope();
2679 let (response_rx, rid) = self.send_noawait(envelope).await?;
2680
2681 let envelope = BeginStream::from_rid(rid.clone());
2683 debug!("Sending beginstream to #{}", rid);
2684 if let Err(e) = self.send_envelope(envelope).await {
2685 let inner = self.inner.lock().await;
2686 inner.queries.lock().await.remove(&rid);
2687 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2688 }
2689
2690 let mut counter = 0;
2692 loop {
2693 let bytes_read = file.read(&mut buffer).map_err(|e| {
2694 OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2695 })?;
2696 counter += 1;
2697
2698 if bytes_read == 0 {
2699 break;
2700 }
2701
2702 let chunk = buffer[..bytes_read].to_vec();
2703 let envelope = Stream::from_rid(chunk, rid.clone());
2704 debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2705 if let Err(e) = self.send_envelope(envelope).await {
2706 let inner = self.inner.lock().await;
2707 inner.queries.lock().await.remove(&rid);
2708 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2709 }
2710 }
2711
2712 let envelope = EndStream::from_rid(rid.clone());
2714 debug!("Sending endstream to #{}", rid);
2715 if let Err(e) = self.send_envelope(envelope).await {
2716 let inner = self.inner.lock().await;
2717 inner.queries.lock().await.remove(&rid);
2718 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2719 }
2720
2721 debug!("Wait for upload response for #{}", rid);
2723 let result = response_rx.await;
2724 let inner = self.inner.lock().await;
2725 inner.queries.lock().await.remove(&rid);
2726
2727 match result {
2728 Ok(response) => {
2729 if response.command == "error" {
2730 let error_response: ErrorResponse = prost::Message::decode(
2731 response.data.unwrap().value.as_ref(),
2732 )
2733 .map_err(|e| {
2734 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2735 })?;
2736 return Err(OpenIAPError::ServerError(error_response.message));
2737 }
2738 let upload_response: UploadResponse =
2739 prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2740 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2741 })?;
2742 Ok(upload_response)
2743 }
2744 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2745 }
2746 }
2747 #[tracing::instrument(skip_all)]
2749 pub async fn watch(
2750 &self,
2751 mut config: WatchRequest,
2752 callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2753 ) -> Result<String, OpenIAPError> {
2754 if config.collectionname.is_empty() {
2755 config.collectionname = "entities".to_string();
2756 }
2757 if config.paths.is_empty() {
2758 config.paths = vec!["".to_string()];
2759 }
2760
2761 let envelope = config.to_envelope();
2762 let result = self.send(envelope, None).await;
2763 match result {
2764 Ok(m) => {
2765 let data = match m.data {
2766 Some(data) => data,
2767 None => {
2768 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2769 }
2770 };
2771 if m.command == "error" {
2772 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2773 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2774 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2775 }
2776 let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2777 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2778
2779 let inner = self.inner.lock().await;
2780 inner
2781 .watches
2782 .lock()
2783 .await
2784 .insert(response.id.clone(), callback);
2785
2786 Ok(response.id)
2787 }
2788 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2789 }
2790 }
2791 #[tracing::instrument(skip_all)]
2793 pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2794 let config = UnWatchRequest::byid(id);
2795 let envelope = config.to_envelope();
2796 let result = self.send(envelope, None).await;
2797 match result {
2798 Ok(m) => {
2799 let data = match m.data {
2800 Some(data) => data,
2801 None => {
2802 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2803 }
2804 };
2805 if m.command == "error" {
2806 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2807 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2808 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2809 }
2810 Ok(())
2811 }
2812 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2813 }
2814 }
2815 #[tracing::instrument(skip_all)]
2817 pub async fn register_queue(
2818 &self,
2819 mut config: RegisterQueueRequest,
2820 callback: QueueCallbackFn,
2821 ) -> Result<String, OpenIAPError> {
2822 if config.queuename.is_empty() {
2823 config.queuename = "".to_string();
2824 }
2825
2826 let envelope = config.to_envelope();
2827 let result = self.send(envelope, None).await;
2828 match result {
2829 Ok(m) => {
2830 let data = match m.data {
2831 Some(data) => data,
2832 None => {
2833 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2834 }
2835 };
2836 if m.command == "error" {
2837 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2838 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2839 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2840 }
2841 let response: RegisterQueueResponse =
2842 prost::Message::decode(data.value.as_ref())
2843 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2844
2845 let inner = self.inner.lock().await;
2846 inner
2847 .queues
2848 .lock()
2849 .await
2850 .insert(response.queuename.clone(), callback);
2851
2852 Ok(response.queuename)
2853 }
2854 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2855 }
2856 }
2857 #[tracing::instrument(skip_all)]
2859 pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2860 let config = UnRegisterQueueRequest::byqueuename(queuename);
2861 let envelope = config.to_envelope();
2862 let result = self.send(envelope, None).await;
2863 match result {
2864 Ok(m) => {
2865 let data = match m.data {
2866 Some(data) => data,
2867 None => {
2868 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2869 }
2870 };
2871 if m.command == "error" {
2872 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2873 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2874 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2875 }
2876 Ok(())
2877 }
2878 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2879 }
2880 }
2881 #[tracing::instrument(skip_all)]
2883 pub async fn register_exchange(
2884 &self,
2885 mut config: RegisterExchangeRequest,
2886 callback: QueueCallbackFn,
2887 ) -> Result<String, OpenIAPError> {
2888 if config.exchangename.is_empty() {
2889 return Err(OpenIAPError::ClientError(
2890 "No exchange name provided".to_string(),
2891 ));
2892 }
2893 if config.algorithm.is_empty() {
2894 config.algorithm = "fanout".to_string();
2895 }
2896 let envelope = config.to_envelope();
2897 let result = self.send(envelope, None).await;
2898 match result {
2899 Ok(m) => {
2900 let data = match m.data {
2901 Some(data) => data,
2902 None => {
2903 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2904 }
2905 };
2906 if m.command == "error" {
2907 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2908 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2909 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2910 }
2911 let response: RegisterExchangeResponse =
2912 prost::Message::decode(data.value.as_ref())
2913 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2914 if !response.queuename.is_empty() {
2915 let inner = self.inner.lock().await;
2916 inner
2917 .queues
2918 .lock()
2919 .await
2920 .insert(response.queuename.clone(), callback);
2921 }
2922 Ok(response.queuename)
2923 }
2924 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2925 }
2926 }
2927 #[tracing::instrument(skip_all)]
2929 pub async fn queue_message(
2930 &self,
2931 config: QueueMessageRequest,
2932 ) -> Result<QueueMessageResponse, OpenIAPError> {
2933 if config.queuename.is_empty() && config.exchangename.is_empty() {
2934 return Err(OpenIAPError::ClientError(
2935 "No queue or exchange name provided".to_string(),
2936 ));
2937 }
2938 let envelope = config.to_envelope();
2939 let result = self.send(envelope, None).await;
2940 match result {
2941 Ok(m) => {
2942 let data = match m.data {
2943 Some(d) => d,
2944 None => {
2945 return Err(OpenIAPError::ClientError("No data in response".to_string()))
2946 }
2947 };
2948 if m.command == "error" {
2949 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2950 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2951 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2952 }
2953 let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2954 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2955 Ok(response)
2956 }
2957 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2958 }
2959 }
2960 #[tracing::instrument(skip_all)]
2962 pub async fn rpc(&self, mut config: QueueMessageRequest, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
2963 if config.queuename.is_empty() && config.exchangename.is_empty() {
2964 return Err(OpenIAPError::ClientError(
2965 "No queue or exchange name provided".to_string(),
2966 ));
2967 }
2968
2969 let (tx, rx) = oneshot::channel::<String>();
2970 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2971
2972 let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
2974 if let Some(tx) = tx.lock().unwrap().take() {
2975 let _ = tx.send(event.data);
2976 } else {
2977 debug!("Queue already closed");
2978 }
2979 Box::pin(async { None })
2980 });
2981
2982 let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
2984 let mut callback_guard = self.rpc_callback.lock().await;
2985
2986 let state = self.get_state();
2988 if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
2989 let q = self
2991 .register_queue(
2992 RegisterQueueRequest {
2993 queuename: "".to_string(),
2994 },
2995 callback.clone(),
2996 )
2997 .await?;
2998 *reply_queue_guard = Some(q.clone());
2999 *callback_guard = Some(callback.clone());
3000 } else {
3001 if let Some(qname) = reply_queue_guard.as_ref() {
3003 let inner = self.inner.lock().await;
3004 inner.queues.lock().await.insert(qname.clone(), callback.clone());
3005 }
3006 }
3007
3008 let q = reply_queue_guard.as_ref().unwrap().clone();
3009 config.replyto = q.clone();
3010 let envelope = config.to_envelope();
3011
3012 let result = self.send(envelope, None).await;
3013 let rpc_result = match result {
3014 Ok(m) => {
3015 let data = match m.data {
3016 Some(d) => d,
3017 None => {
3018 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3019 }
3020 };
3021 if m.command == "error" {
3022 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3023 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3024 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3025 }
3026
3027 match tokio::time::timeout(timeout, rx).await {
3028 Ok(Ok(val)) => Ok(val),
3029 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3030 Err(_) => {
3031 *reply_queue_guard = None;
3033 *callback_guard = None;
3034 Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3035 },
3036 }
3037 }
3038 Err(e) => {
3039 *reply_queue_guard = None;
3041 *callback_guard = None;
3042 Err(OpenIAPError::ClientError(e.to_string()))
3043 }
3044 };
3045
3046 rpc_result
3047 }
3048 #[tracing::instrument(skip_all)]
3122 pub async fn push_workitem(
3123 &self,
3124 mut config: PushWorkitemRequest,
3125 ) -> Result<PushWorkitemResponse, OpenIAPError> {
3126 if config.wiq.is_empty() && config.wiqid.is_empty() {
3127 return Err(OpenIAPError::ClientError(
3128 "No queue name or id provided".to_string(),
3129 ));
3130 }
3131 for f in &mut config.files {
3132 if f.filename.is_empty() && f.file.is_empty() {
3133 debug!("Filename is empty");
3134 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3135 if !std::path::Path::new(&f.filename).exists() {
3137 debug!("File does not exist: {}", f.filename);
3138 } else {
3139 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3140 if filesize < 5 * 1024 * 1024 {
3142 debug!("File {} exists so ATTACHING it.", f.filename);
3143 let filename = std::path::Path::new(&f.filename)
3144 .file_name()
3145 .unwrap()
3146 .to_str()
3147 .unwrap();
3148 f.file = std::fs::read(&f.filename).unwrap();
3149 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3152 f.compressed = true;
3153 f.filename = filename.to_string();
3154 f.id = "findme".to_string();
3155 trace!(
3156 "File {} was read and assigned to f.file, size: {}",
3157 f.filename,
3158 f.file.len()
3159 );
3160 } else {
3161 debug!("File {} exists so UPLOADING it.", f.filename);
3162 let filename = std::path::Path::new(&f.filename)
3163 .file_name()
3164 .unwrap()
3165 .to_str()
3166 .unwrap();
3167 let uploadconfig = UploadRequest {
3168 filename: filename.to_string(),
3169 collectionname: "fs.files".to_string(),
3170 ..Default::default()
3171 };
3172 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3173 trace!("File {} was upload as {}", filename, uploadresult.id);
3174 f.id = uploadresult.id.clone();
3176 f.filename = filename.to_string();
3177 }
3178 }
3179 } else {
3180 debug!("File {} is already uploaded", f.filename);
3181 }
3182 }
3183 let envelope = config.to_envelope();
3184 let result = self.send(envelope, None).await;
3185 match result {
3186 Ok(m) => {
3187 let data = match m.data {
3188 Some(data) => data,
3189 None => {
3190 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3191 }
3192 };
3193 if m.command == "error" {
3194 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3195 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3196 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3197 }
3198 let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3199 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3200 Ok(response)
3201 }
3202 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3203 }
3204 }
3205 #[tracing::instrument(skip_all)]
3209 pub async fn push_workitems(
3210 &self,
3211 mut config: PushWorkitemsRequest,
3212 ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3213 if config.wiq.is_empty() && config.wiqid.is_empty() {
3214 return Err(OpenIAPError::ClientError(
3215 "No queue name or id provided".to_string(),
3216 ));
3217 }
3218 for wi in &mut config.items {
3219 for f in &mut wi.files {
3220 if f.filename.is_empty() && f.file.is_empty() {
3221 debug!("Filename is empty");
3222 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3223 if !std::path::Path::new(&f.filename).exists() {
3225 debug!("File does not exist: {}", f.filename);
3226 } else {
3227 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3228 if filesize < 5 * 1024 * 1024 {
3230 debug!("File {} exists so ATTACHING it.", f.filename);
3231 let filename = std::path::Path::new(&f.filename)
3232 .file_name()
3233 .unwrap()
3234 .to_str()
3235 .unwrap();
3236 f.file = std::fs::read(&f.filename).unwrap();
3237 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3240 f.compressed = true;
3241 f.filename = filename.to_string();
3242 f.id = "findme".to_string();
3243 trace!(
3244 "File {} was read and assigned to f.file, size: {}",
3245 f.filename,
3246 f.file.len()
3247 );
3248 } else {
3249 debug!("File {} exists so UPLOADING it.", f.filename);
3250 let filename = std::path::Path::new(&f.filename)
3251 .file_name()
3252 .unwrap()
3253 .to_str()
3254 .unwrap();
3255 let uploadconfig = UploadRequest {
3256 filename: filename.to_string(),
3257 collectionname: "fs.files".to_string(),
3258 ..Default::default()
3259 };
3260 let uploadresult =
3261 self.upload(uploadconfig, &f.filename).await.unwrap();
3262 trace!("File {} was upload as {}", filename, uploadresult.id);
3263 f.id = uploadresult.id.clone();
3265 f.filename = filename.to_string();
3266 }
3267 }
3268 } else {
3269 debug!("File {} is already uploaded", f.filename);
3270 }
3271 }
3272 }
3273 let envelope = config.to_envelope();
3274 let result = self.send(envelope, None).await;
3275 match result {
3276 Ok(m) => {
3277 let data = match m.data {
3278 Some(data) => data,
3279 None => {
3280 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3281 }
3282 };
3283 if m.command == "error" {
3284 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3285 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3286 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3287 }
3288 let response: PushWorkitemsResponse =
3289 prost::Message::decode(data.value.as_ref())
3290 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3291 Ok(response)
3292 }
3293 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3294 }
3295 }
3296 #[tracing::instrument(skip_all)]
3299 pub async fn pop_workitem(
3300 &self,
3301 config: PopWorkitemRequest,
3302 downloadfolder: Option<&str>,
3303 ) -> Result<PopWorkitemResponse, OpenIAPError> {
3304 if config.wiq.is_empty() && config.wiqid.is_empty() {
3305 return Err(OpenIAPError::ClientError(
3306 "No queue name or id provided".to_string(),
3307 ));
3308 }
3309 let envelope = config.to_envelope();
3310 let result = self.send(envelope, None).await;
3311 match result {
3312 Ok(m) => {
3313 let data = match m.data {
3314 Some(data) => data,
3315 None => {
3316 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3317 }
3318 };
3319 if m.command == "error" {
3320 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3321 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3322 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3323 }
3324 let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3325 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3326
3327 match &response.workitem {
3328 Some(wi) => {
3329 for f in &wi.files {
3330 if !f.id.is_empty() {
3331 let downloadconfig = DownloadRequest {
3332 id: f.id.clone(),
3333 collectionname: "fs.files".to_string(),
3334 ..Default::default()
3335 };
3336 let downloadresult =
3337 match self.download(downloadconfig, downloadfolder, None).await
3338 {
3339 Ok(r) => r,
3340 Err(e) => {
3341 debug!("Failed to download file: {}", e);
3342 continue;
3343 }
3344 };
3345 debug!(
3346 "File {} was downloaded as {}",
3347 f.filename, downloadresult.filename
3348 );
3349 }
3350 }
3351 }
3352 None => {
3353 debug!("No workitem found");
3354 }
3355 }
3356 Ok(response)
3357 }
3358 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3359 }
3360 }
3361 #[tracing::instrument(skip_all)]
3367 pub async fn update_workitem(
3368 &self,
3369 mut config: UpdateWorkitemRequest,
3370 ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3371 match &config.workitem {
3372 Some(wiq) => {
3373 if wiq.id.is_empty() {
3374 return Err(OpenIAPError::ClientError(
3375 "No workitem id provided".to_string(),
3376 ));
3377 }
3378 }
3379 None => {
3380 return Err(OpenIAPError::ClientError(
3381 "No workitem provided".to_string(),
3382 ));
3383 }
3384 }
3385 for f in &mut config.files {
3386 if f.filename.is_empty() && f.file.is_empty() {
3387 debug!("Filename is empty");
3388 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3389 if !std::path::Path::new(&f.filename).exists() {
3390 debug!("File does not exist: {}", f.filename);
3391 } else {
3392 debug!("File {} exists so uploading it.", f.filename);
3393 let filename = std::path::Path::new(&f.filename)
3394 .file_name()
3395 .unwrap()
3396 .to_str()
3397 .unwrap();
3398 let uploadconfig = UploadRequest {
3399 filename: filename.to_string(),
3400 collectionname: "fs.files".to_string(),
3401 ..Default::default()
3402 };
3403 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3404 trace!("File {} was upload as {}", filename, uploadresult.id);
3405 f.id = uploadresult.id.clone();
3406 f.filename = filename.to_string();
3407 }
3408 } else {
3409 debug!("Skipped file");
3410 }
3411 }
3412 let envelope = config.to_envelope();
3413 let result = self.send(envelope, None).await;
3414 match result {
3415 Ok(m) => {
3416 let data = match m.data {
3417 Some(d) => d,
3418 None => {
3419 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3420 }
3421 };
3422 if m.command == "error" {
3423 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3424 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3425 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3426 }
3427 let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3428 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3429 Ok(response)
3430 }
3431 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3432 }
3433 }
3434 #[tracing::instrument(skip_all)]
3436 pub async fn delete_workitem(
3437 &self,
3438 config: DeleteWorkitemRequest,
3439 ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3440 if config.id.is_empty() {
3441 return Err(OpenIAPError::ClientError(
3442 "No workitem id provided".to_string(),
3443 ));
3444 }
3445 let envelope = config.to_envelope();
3446 let result = self.send(envelope, None).await;
3447 match result {
3448 Ok(m) => {
3449 let data = match m.data {
3450 Some(d) => d,
3451 None => {
3452 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3453 }
3454 };
3455 if m.command == "error" {
3456 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3457 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3458 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3459 }
3460 let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3461 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462 Ok(response)
3463 }
3464 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3465 }
3466 }
3467 #[tracing::instrument(skip_all)]
3469 pub async fn add_workitem_queue(
3470 &self,
3471 config: AddWorkItemQueueRequest,
3472 ) -> Result<WorkItemQueue, OpenIAPError> {
3473 if config.workitemqueue.is_none() {
3474 return Err(OpenIAPError::ClientError(
3475 "No workitem queue name provided".to_string(),
3476 ));
3477 }
3478 let envelope = config.to_envelope();
3479 let result = self.send(envelope, None).await;
3480 match result {
3481 Ok(m) => {
3482 let data = match m.data {
3483 Some(d) => d,
3484 None => {
3485 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3486 }
3487 };
3488 if m.command == "error" {
3489 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3490 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3491 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3492 }
3493 let response: AddWorkItemQueueResponse =
3494 prost::Message::decode(data.value.as_ref())
3495 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3496 match response.workitemqueue {
3497 Some(wiq) => Ok(wiq),
3498 None => {
3499 return Err(OpenIAPError::ClientError(
3500 "No workitem queue returned".to_string(),
3501 ));
3502 }
3503 }
3504 }
3505 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3506 }
3507 }
3508 #[tracing::instrument(skip_all)]
3510 pub async fn update_workitem_queue(
3511 &self,
3512 config: UpdateWorkItemQueueRequest,
3513 ) -> Result<WorkItemQueue, OpenIAPError> {
3514 if config.workitemqueue.is_none() {
3515 return Err(OpenIAPError::ClientError(
3516 "No workitem queue name provided".to_string(),
3517 ));
3518 }
3519 let envelope = config.to_envelope();
3520 let result = self.send(envelope, None).await;
3521 match result {
3522 Ok(m) => {
3523 let data = match m.data {
3524 Some(d) => d,
3525 None => {
3526 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3527 }
3528 };
3529 if m.command == "error" {
3530 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3531 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3532 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3533 }
3534 let response: UpdateWorkItemQueueResponse =
3535 prost::Message::decode(data.value.as_ref())
3536 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3537 match response.workitemqueue {
3538 Some(wiq) => Ok(wiq),
3539 None => {
3540 return Err(OpenIAPError::ClientError(
3541 "No workitem queue returned".to_string(),
3542 ));
3543 }
3544 }
3545 }
3546 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3547 }
3548 }
3549 #[tracing::instrument(skip_all)]
3551 pub async fn delete_workitem_queue(
3552 &self,
3553 config: DeleteWorkItemQueueRequest,
3554 ) -> Result<(), OpenIAPError> {
3555 if config.wiq.is_empty() && config.wiqid.is_empty() {
3556 return Err(OpenIAPError::ClientError(
3557 "No workitem queue name or id provided".to_string(),
3558 ));
3559 }
3560 let envelope = config.to_envelope();
3561 let result = self.send(envelope, None).await;
3562 match result {
3563 Ok(m) => {
3564 let data = match m.data {
3565 Some(d) => d,
3566 None => {
3567 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3568 }
3569 };
3570 if m.command == "error" {
3571 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3572 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3573 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3574 }
3575 Ok(())
3576 }
3577 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3578 }
3579 }
3580 #[tracing::instrument(skip_all)]
3582 pub async fn custom_command(
3583 &self,
3584 config: CustomCommandRequest,
3585 timeout: Option<tokio::time::Duration>,
3586 ) -> Result<String, OpenIAPError> {
3587 if config.command.is_empty() {
3588 return Err(OpenIAPError::ClientError("No command provided".to_string()));
3589 }
3590 let envelope = config.to_envelope();
3591 let result = self.send(envelope, timeout).await;
3592 match result {
3593 Ok(m) => {
3594 let data = match m.data {
3595 Some(d) => d,
3596 None => {
3597 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3598 }
3599 };
3600 if m.command == "error" {
3601 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3602 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3603 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3604 }
3605 let response: CustomCommandResponse =
3606 prost::Message::decode(data.value.as_ref())
3607 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3608 Ok(response.result)
3609 }
3610 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3611 }
3612 }
3613 #[tracing::instrument(skip_all)]
3615 pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3616 let config = DeletePackageRequest::byid(packageid);
3617 let envelope = config.to_envelope();
3618 let result = self.send(envelope, None).await;
3619 match result {
3620 Ok(m) => {
3621 let data = match m.data {
3622 Some(data) => data,
3623 None => {
3624 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3625 }
3626 };
3627 if m.command == "error" {
3628 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3629 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3630 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3631 }
3632 Ok(())
3635 }
3636 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3637 }
3638 }
3639 #[tracing::instrument(skip_all)]
3641 pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3642 let config = StartAgentRequest::byid(agentid);
3643 let envelope = config.to_envelope();
3644 let result = self.send(envelope, None).await;
3645 match result {
3646 Ok(m) => {
3647 let data = match m.data {
3648 Some(d) => d,
3649 None => {
3650 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3651 }
3652 };
3653 if m.command == "error" {
3654 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3655 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3656 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3657 }
3658 Ok(())
3661 }
3662 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3663 }
3664 }
3665 #[tracing::instrument(skip_all)]
3667 pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3668 let config = StopAgentRequest::byid(agentid);
3669 let envelope = config.to_envelope();
3670 let result = self.send(envelope, None).await;
3671 match result {
3672 Ok(m) => {
3673 let data = match m.data {
3674 Some(d) => d,
3675 None => {
3676 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3677 }
3678 };
3679 if m.command == "error" {
3680 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3681 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3682 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3683 }
3684 Ok(())
3687 }
3688 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3689 }
3690 }
3691 #[tracing::instrument(skip_all)]
3693 pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3694 let config = DeleteAgentPodRequest::byid(agentid, podname);
3695 let envelope = config.to_envelope();
3696 let result = self.send(envelope, None).await;
3697 match result {
3698 Ok(m) => {
3699 let data = match m.data {
3700 Some(d) => d,
3701 None => {
3702 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3703 }
3704 };
3705 if m.command == "error" {
3706 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3707 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3708 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3709 }
3710 Ok(())
3713 }
3714 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3715 }
3716 }
3717 #[tracing::instrument(skip_all)]
3719 pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3720 let config = DeleteAgentRequest::byid(agentid);
3721 let envelope = config.to_envelope();
3722 let result = self.send(envelope, None).await;
3723 match result {
3724 Ok(m) => {
3725 let data = match m.data {
3726 Some(d) => d,
3727 None => {
3728 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3729 }
3730 };
3731 if m.command == "error" {
3732 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3733 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3734 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3735 }
3736 Ok(())
3739 }
3740 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3741 }
3742 }
3743 #[tracing::instrument(skip_all)]
3745 pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3746 let config = GetAgentPodsRequest::byid(agentid, stats);
3747 let envelope = config.to_envelope();
3748 let result = self.send(envelope, None).await;
3749 match result {
3750 Ok(m) => {
3751 let data = match m.data {
3752 Some(d) => d,
3753 None => {
3754 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3755 }
3756 };
3757 if m.command == "error" {
3758 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3759 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3760 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3761 }
3762 let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3763 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3764 Ok(response.results)
3765 }
3766 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3767 }
3768 }
3769 #[tracing::instrument(skip_all)]
3771 pub async fn get_agent_pod_logs(
3772 &self,
3773 agentid: &str,
3774 podname: &str,
3775 ) -> Result<String, OpenIAPError> {
3776 let config = GetAgentLogRequest::new(agentid, podname);
3777 let envelope = config.to_envelope();
3778 let result = self.send(envelope, None).await;
3779 match result {
3780 Ok(m) => {
3781 let data = match m.data {
3782 Some(d) => d,
3783 None => {
3784 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3785 }
3786 };
3787 if m.command == "error" {
3788 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3789 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3790 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3791 }
3792 let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3793 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3794 Ok(response.result)
3795 }
3796 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3797 }
3798 }
3799
3800 #[tracing::instrument(skip_all)]
3803 pub async fn ensure_customer(
3804 &self,
3805 config: EnsureCustomerRequest,
3806 ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3807 if config.customer.is_none() && config.stripe.is_none() {
3808 return Err(OpenIAPError::ClientError(
3809 "No customer or stripe provided".to_string(),
3810 ));
3811 }
3812 let envelope = config.to_envelope();
3813 let result = self.send(envelope, None).await;
3814 match result {
3815 Ok(m) => {
3816 let data = match m.data {
3817 Some(d) => d,
3818 None => {
3819 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3820 }
3821 };
3822 if m.command == "error" {
3823 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3824 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3825 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3826 }
3827 let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3828 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3829 Ok(response)
3830 }
3831 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3832 }
3833 }
3834 #[tracing::instrument(skip_all)]
3836 pub async fn create_workflow_instance(
3837 &self,
3838 config: CreateWorkflowInstanceRequest,
3839 ) -> Result<String, OpenIAPError> {
3840 if config.workflowid.is_empty() {
3841 return Err(OpenIAPError::ClientError(
3842 "No workflow id provided".to_string(),
3843 ));
3844 }
3845 let envelope = config.to_envelope();
3846 let result = self.send(envelope, None).await;
3847 match result {
3848 Ok(m) => {
3849 let data = match m.data {
3850 Some(d) => d,
3851 None => {
3852 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3853 }
3854 };
3855 if m.command == "error" {
3856 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3857 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3858 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3859 }
3860 let response: CreateWorkflowInstanceResponse =
3861 prost::Message::decode(data.value.as_ref())
3862 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3863 Ok(response.instanceid)
3864 }
3865 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3866 }
3867 }
3868
3869 #[tracing::instrument(skip_all)]
3871 pub async fn invoke_openrpa(
3872 &self,
3873 config: InvokeOpenRpaRequest,
3874 ) -> Result<String, OpenIAPError> {
3875 if config.robotid.is_empty() {
3876 return Err(OpenIAPError::ClientError(
3877 "No robot id provided".to_string(),
3878 ));
3879 }
3880 if config.workflowid.is_empty() {
3881 return Err(OpenIAPError::ClientError(
3882 "No workflow id provided".to_string(),
3883 ));
3884 }
3885
3886 let (tx, rx) = oneshot::channel::<String>();
3887 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3888
3889 let q = self
3890 .register_queue(
3891 RegisterQueueRequest {
3892 queuename: "".to_string(),
3893 },
3894 Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3895 let tx = tx.clone();
3896 Box::pin(async move {
3897 let json = event.data.clone();
3898 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3899 let command: String = obj["command"].as_str().unwrap().to_string();
3900 debug!("Received event: {:?}", event);
3901 if command.eq("invokesuccess") {
3902 debug!("Robot successfully started running workflow");
3903 } else if command.eq("invokeidle") {
3904 debug!("Workflow went idle");
3905 } else if command.eq("invokeerror") {
3906 debug!("Robot failed to run workflow");
3907 let tx = tx.lock().unwrap().take().unwrap();
3908 tx.send(event.data).unwrap();
3909 } else if command.eq("timeout") {
3910 debug!("No robot picked up the workflow");
3911 let tx = tx.lock().unwrap().take().unwrap();
3912 tx.send(event.data).unwrap();
3913 } else if command.eq("invokecompleted") {
3914 debug!("Robot completed running workflow");
3915 let tx = tx.lock().unwrap().take().unwrap();
3916 tx.send(event.data).unwrap();
3917 } else {
3918 let tx = tx.lock().unwrap().take().unwrap();
3919 tx.send(event.data).unwrap();
3920 }
3921 None
3922 })
3923 }),
3924 )
3925 .await
3926 .unwrap();
3927 debug!("Registered Response Queue: {:?}", q);
3928 let data = format!(
3929 "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3930 config.workflowid, config.payload
3931 );
3932 debug!("Send Data: {}", data);
3933 debug!("To Queue: {} With reply to: {}", config.robotid, q);
3934 let config = QueueMessageRequest {
3935 queuename: config.robotid.clone(),
3936 replyto: q.clone(),
3937 data,
3938 ..Default::default()
3939 };
3940
3941 let envelope = config.to_envelope();
3942
3943 let result = self.send(envelope, None).await;
3944 match result {
3945 Ok(m) => {
3946 let data = match m.data {
3947 Some(d) => d,
3948 None => {
3949 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3950 }
3951 };
3952 if m.command == "error" {
3953 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3954 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3955 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3956 }
3957 let json = rx.await.unwrap();
3961 debug!("Received json result: {:?}", json);
3962 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3963 let command: String = obj["command"].as_str().unwrap().to_string();
3964 let mut data = "".to_string();
3965 if obj["data"].as_str().is_some() {
3966 data = obj["data"].as_str().unwrap().to_string();
3967 } else if obj["data"].as_object().is_some() {
3968 data = obj["data"].to_string();
3969 }
3970 if !command.eq("invokecompleted") {
3971 if command.eq("timeout") {
3972 return Err(OpenIAPError::ServerError("Timeout".to_string()));
3973 } else {
3974 if data.is_empty() {
3975 return Err(OpenIAPError::ServerError(
3976 "Error with no message".to_string(),
3977 ));
3978 }
3979 return Err(OpenIAPError::ServerError(data));
3980 }
3981 }
3982 let response = self.unregister_queue(&q).await;
3983 match response {
3984 Ok(_) => {
3985 debug!("Unregistered Response Queue: {:?}", q);
3986 }
3987 Err(e) => {
3988 error!("Failed to unregister Response Queue: {:?}", e);
3989 }
3990 }
3991 Ok(data)
3992 }
3993 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3994 }
3995 }
3996}
3997