1#![warn(missing_docs)]
2pub use openiap_proto::errors::*;
25pub use openiap_proto::openiap::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use openiap::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30use std::fmt::{Display,Formatter};
31use tokio::task::JoinHandle;
32use tokio_tungstenite::WebSocketStream;
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{set_otel_url, enable_tracing, disable_tracing};
55pub use crate::otel::{set_f64_observable_gauge, set_u64_observable_gauge, set_i64_observable_gauge, disable_observable_gauge};
56
57type QuerySender = oneshot::Sender<Envelope>;
58type StreamSender = mpsc::Sender<Vec<u8>>;
59type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
60use futures::StreamExt;
61use async_channel::unbounded;
62#[derive(Clone)]
68pub struct Client {
69 connect_called: Arc<std::sync::Mutex<bool>>,
71 runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
73
74 stats: Arc<std::sync::Mutex<ClientStatistics>>,
76
77 task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
78 pub client: Arc<std::sync::Mutex<ClientEnum>>,
80 user: Arc<std::sync::Mutex<Option<User>>>,
82 pub inner: Arc<Mutex<ClientInner>>,
84 pub config: Arc<std::sync::Mutex<Option<Config>>>,
86 pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
88 pub url: Arc<std::sync::Mutex<String>>,
90 pub username: Arc<std::sync::Mutex<String>>,
92 pub password: Arc<std::sync::Mutex<String>>,
94 pub jwt: Arc<std::sync::Mutex<String>>,
96 service_name: Arc<std::sync::Mutex<String>>,
97 agent_name: Arc<std::sync::Mutex<String>>,
98 agent_version: Arc<std::sync::Mutex<String>>,
99 event_sender: async_channel::Sender<ClientEvent>,
100 event_receiver: async_channel::Receiver<ClientEvent>,
101 out_envelope_sender: async_channel::Sender<Envelope>,
102 out_envelope_receiver: async_channel::Receiver<Envelope>,
103 rpc_reply_queue: Arc<tokio::sync::Mutex<Option<String>>>,
104 rpc_callback: Arc<tokio::sync::Mutex<Option<QueueCallbackFn>>>,
105
106 pub state: Arc<std::sync::Mutex<ClientState>>,
108 pub msgcount: Arc<std::sync::Mutex<i32>>,
110 pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
112 pub default_timeout: Arc<std::sync::Mutex<tokio::time::Duration>>,
114}
115#[derive(Clone, Default)]
117pub struct ClientStatistics {
118 connection_attempts: u64,
119 connections: u64,
120 package_tx: u64,
121 package_rx: u64,
122 signin: u64,
123 download: u64,
124 getdocumentversion: u64,
125 customcommand: u64,
126 listcollections: u64,
127 createcollection: u64,
128 dropcollection: u64,
129 ensurecustomer: u64,
130 invokeopenrpa: u64,
131 registerqueue: u64,
132 registerexchange: u64,
133 unregisterqueue: u64,
134 watch: u64,
135 unwatch: u64,
136 queuemessage: u64,
137 pushworkitem: u64,
138 pushworkitems: u64,
139 popworkitem: u64,
140 updateworkitem: u64,
141 deleteworkitem: u64,
142 addworkitemqueue: u64,
143 updateworkitemqueue: u64,
144 deleteworkitemqueue: u64,
145 getindexes: u64,
146 createindex: u64,
147 dropindex: u64,
148 upload: u64,
149 query: u64,
150 count: u64,
151 distinct: u64,
152 aggregate: u64,
153 insertone: u64,
154 insertmany: u64,
155 insertorupdateone: u64,
156 insertorupdatemany: u64,
157 updateone: u64,
158 updatedocument: u64,
159 deleteone: u64,
160 deletemany: u64,
161}
162use futures::future::BoxFuture;
164type QueueCallbackFn =
165 Arc<dyn Fn(Arc<Client>, QueueEvent) -> BoxFuture<'static, Option<String>> + Send + Sync>;
166
167#[derive(Clone)]
170pub struct ClientInner {
171 pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
173 pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
175 #[allow(clippy::type_complexity)]
177 pub watches:
178 Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
179 #[allow(clippy::type_complexity)]
181 pub queues:
182 Arc<Mutex<std::collections::HashMap<String, QueueCallbackFn>>>,
183}
184#[derive(Clone, Debug)]
186pub enum ClientEnum {
187 None,
189 Grpc(FlowServiceClient<tonic::transport::Channel>),
191 WS(Arc<Mutex<Sock>>)
193}
194#[derive(Debug, Clone, PartialEq)]
196pub enum ClientEvent {
197 Connecting,
199 Connected,
201 Disconnected(String),
203 SignedIn,
205 }
218#[derive(Debug, Clone, PartialEq)]
220pub enum ClientState {
221 Disconnected,
223 Connecting,
225 Connected,
227 Signedin
229}
230impl Display for ClientState {
231 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
232 match self {
233 ClientState::Disconnected => write!(f, "Disconnected"),
234 ClientState::Connecting => write!(f, "Connecting"),
235 ClientState::Connected => write!(f, "Connected"),
236 ClientState::Signedin => write!(f, "Signedin"),
237 }
238 }
239}
240
241#[derive(Debug, Clone, serde::Deserialize)]
243#[allow(dead_code)]
244pub struct Config {
245 #[serde(default)]
246 wshost: String,
247 #[serde(default)]
248 wsurl: String,
249 #[serde(default)]
250 domain: String,
251 #[serde(default)]
252 auto_create_users: bool,
253 #[serde(default)]
254 namespace: String,
255 #[serde(default)]
256 agent_domain_schema: String,
257 #[serde(default)]
258 version: String,
259 #[serde(default)]
260 validate_emails: bool,
261 #[serde(default)]
262 forgot_pass_emails: bool,
263 #[serde(default)]
264 supports_watch: bool,
265 #[serde(default)]
266 amqp_enabled_exchange: bool,
267 #[serde(default)]
268 multi_tenant: bool,
269 #[serde(default)]
270 enable_entity_restriction: bool,
271 #[serde(default)]
272 enable_web_tours: bool,
273 #[serde(default)]
274 collections_with_text_index: Vec<String>,
275 #[serde(default)]
276 timeseries_collections: Vec<String>,
277 #[serde(default)]
278 ping_clients_interval: i32,
279 #[serde(default)]
280 validlicense: bool,
281 #[serde(default)]
282 forceddomains: Vec<String>,
283 #[serde(default)]
284 grafana_url: String,
285 #[serde(default)]
286 otel_metric_url: String,
287 #[serde(default)]
288 otel_trace_url: String,
289 #[serde(default)]
290 otel_log_url: String,
291 #[serde(default)]
292 enable_analytics: bool,
293}
294impl std::fmt::Debug for ClientInner {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.debug_struct("ClientInner")
297 .field("queries", &self.queries)
299 .field("streams", &self.streams)
300 .finish()
301 }
302}
303impl Default for Client {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309#[derive(Clone, PartialEq)]
312pub struct EnvConfig {
313 pub jwt: String,
315 pub traceid: String,
317 pub spanid: String,
319}
320impl EnvConfig {
321 #[tracing::instrument(skip_all)]
323 pub fn new() -> Self {
324 Self {
325 jwt: String::new(),
326 traceid: String::new(),
327 spanid: String::new(),
328 }
329 }
330 #[tracing::instrument(skip_all)]
332 pub fn with_jwt(jwt: &str) -> Self {
333 Self {
334 jwt: jwt.to_string(),
335 traceid: String::new(),
336 spanid: String::new(),
337 }
338 }
339}
340
341impl Client {
342 pub fn new() -> Self {
344 let (ces, cer) = unbounded::<ClientEvent>();
345 let (out_es, out_er) = unbounded::<Envelope>();
346 let version = env!("CARGO_PKG_VERSION");
347 Self {
348 task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
349 stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
350 user: Arc::new(std::sync::Mutex::new(None)),
351 client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
352 connect_called: Arc::new(std::sync::Mutex::new(false)),
353 runtime: Arc::new(std::sync::Mutex::new(None)),
354 msgcount: Arc::new(std::sync::Mutex::new(-1)),
355 reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
356 rpc_reply_queue: Arc::new(tokio::sync::Mutex::new(None)),
357 rpc_callback: Arc::new(tokio::sync::Mutex::new(None)),
358 inner: Arc::new(Mutex::new(ClientInner {
359 queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
360 streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
361 watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
362 queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
363 })),
364 config: Arc::new(std::sync::Mutex::new(None)),
365 auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
366 url: Arc::new(std::sync::Mutex::new("".to_string())),
367 username: Arc::new(std::sync::Mutex::new("".to_string())),
368 password: Arc::new(std::sync::Mutex::new("".to_string())),
369 jwt: Arc::new(std::sync::Mutex::new("".to_string())),
370 service_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
371 agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
372 agent_version: Arc::new(std::sync::Mutex::new(version.to_string())),
373 event_sender: ces,
374 event_receiver: cer,
375 out_envelope_sender: out_es,
376 out_envelope_receiver: out_er,
377 state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
378 default_timeout: Arc::new(std::sync::Mutex::new(Duration::from_secs(30))),
379 }
380 }
381 #[tracing::instrument(skip_all)]
383 pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
384 let rt = match tokio::runtime::Runtime::new() {
385 Ok(rt) => rt,
386 Err(e) => {
387 return Err(OpenIAPError::ClientError(format!(
388 "Failed to create tokio runtime: {}",
389 e
390 )));
391 }
392 };
393 self.set_runtime(Some(rt));
394 tokio::task::block_in_place(|| {
395 let handle = self.get_runtime_handle();
396 handle.block_on(self.connect_async(dst))
397 })
398 }
399
400 #[allow(unused_variables)]
402 pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
403 let config: Option<Config>;
404 let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
405 let mut port = url.port().unwrap_or(80);
406 if issecure {
407 port = 443;
408 }
409 let mut host = url.host_str().unwrap_or("localhost.openiap.io").replace("grpc.", "");
410 if host.starts_with("api-grpc") {
411 host = "api".to_string();
412 }
413 if port == 50051 {
414 port = 3000;
415 }
416 let configurl = if issecure {
417 format!(
418 "{}://{}:{}/config",
419 "https",
420 host,
421 port
422 )
423 } else {
424 format!(
425 "{}://{}:{}/config",
426 "http",
427 host,
428 port
429 )
430 };
431
432 let configurl = url::Url::parse(configurl.as_str())
433 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
434 trace!("Getting config from: {}", configurl);
435 let o = minreq::get(configurl).with_timeout(5).send();
436 match o {
437 Ok(_) => {
438 let response = match o {
439 Ok(response) => response,
440 Err(e) => {
441 error!("Failed to get config: {}", e);
442 return None;
443 }
444 };
445 if response.status_code == 200 {
446 let body = response.as_str().unwrap();
447 config = Some(match serde_json::from_str(body) {
448 Ok(config) => config,
449 Err(e) => {
450 error!("Failed to parse config: {}", e);
451 return None;
452 }
453 });
454 } else {
455 config = None;
456 }
457 }
458 Err(e) => {
459 error!("Failed to get config: {}", e);
460 return None;
461 }
462 }
463 let mut _enable_analytics = true;
464 let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
465 let mut _otel_trace_url = std::env::var("OTEL_TRACE_URL").unwrap_or_default();
466 let mut _otel_log_url = std::env::var("OTEL_LOG_URL").unwrap_or_default();
467 let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
468 if apihostname.starts_with("grpc.") {
469 apihostname = apihostname[5..].to_string();
470 }
471
472 if config.is_some() {
473 let config = config.as_ref().unwrap();
474 if !config.otel_metric_url.is_empty() {
475 _otel_metric_url = config.otel_metric_url.clone();
476 }
477 if !config.otel_trace_url.is_empty() {
478 _otel_trace_url = config.otel_trace_url.clone();
479 }
480 if !config.otel_log_url.is_empty() {
481 _otel_log_url = config.otel_log_url.clone();
482 }
483 if !config.domain.is_empty() {
484 apihostname = config.domain.clone();
485 }
486 _enable_analytics = config.enable_analytics;
487 }
488 #[cfg(feature = "otel")]
489 if _enable_analytics {
490 let service_name = self.get_service_name();
491 let agent_name = self.get_agent_name();
492 let agent_version = self.get_agent_version();
493 let version = env!("CARGO_PKG_VERSION");
494 match otel::init_telemetry(&service_name, &agent_name, &agent_version, &version, &apihostname, _otel_metric_url.as_str(),
495 _otel_trace_url.as_str(), _otel_log_url.as_str(),
496 &self.stats) {
497 Ok(_) => (),
498 Err(e) => {
499 error!("Failed to initialize telemetry: {}", e);
500 return None;
501 }
502 }
503 }
504 config
505 }
506
507 #[tracing::instrument(skip_all)]
509 pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
510 #[cfg(test)]
511 {
512 enable_tracing("openiap=error", "");
516 }
518 if self.is_connect_called() {
519 self.set_auto_reconnect(true);
520 return self.reconnect().await;
521 }
522 let mut strurl = dst.to_string();
523 if strurl.is_empty() {
524 strurl = std::env::var("apiurl").unwrap_or("".to_string());
525 if strurl.is_empty() {
526 strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
527 }
528 if strurl.is_empty() {
529 strurl = std::env::var("OPENIAP_URL").unwrap_or("".to_string());
530 }
531 if strurl.is_empty() {
532 strurl = std::env::var("OPENIAP_APIURL").unwrap_or("".to_string());
533 }
534 if strurl.is_empty() {
535 strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
536 }
537 }
538 if strurl.is_empty() {
539 return Err(OpenIAPError::ClientError("No URL provided".to_string()));
540 }
541 let url = url::Url::parse(strurl.as_str())
542 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
543 let mut username = "".to_string();
544 let mut password = "".to_string();
545 if let Some(p) = url.password() {
546 password = p.to_string();
547 }
548 if !url.username().is_empty() {
549 username = url.username().to_string();
550 }
551 if !username.is_empty() && !password.is_empty() {
552 self.set_username(&username);
553 self.set_password(&password);
554 }
555 let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
556 if url.scheme() != "http"
557 && url.scheme() != "https"
558 && url.scheme() != "grpc"
559 && url.scheme() != "ws"
560 && url.scheme() != "wss"
561 {
562 return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
563 }
564 if url.scheme() == "grpc" {
565 if url.port() == Some(443) {
566 strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
567 } else {
568 strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
569 }
570 }
571 let url = url::Url::parse(strurl.as_str())
572 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
573 if url.port().is_none() {
574 strurl = format!(
575 "{}://{}",
576 url.scheme(),
577 url.host_str().unwrap_or("app.openiap.io")
578 );
579 } else {
580 strurl = format!(
581 "{}://{}:{}",
582 url.scheme(),
583 url.host_str().unwrap_or("localhost.openiap.io"),
584 url.port().unwrap_or(80)
585 );
586 }
587 debug!("Connecting to {}", strurl);
588 let config = self.load_config(strurl.as_str(), &url).await;
589 if !usegprc {
590 strurl = format!("{}/ws/v2", strurl);
591
592 let (_stream_tx, stream_rx) = mpsc::channel(60);
593
594 let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
595 Ok((socket, _)) => socket,
596 Err(e) => {
597 return Err(OpenIAPError::ClientError(format!(
598 "Failed to connect to WS: {}",
599 e
600 )));
601 }
602 };
603 self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
604 self.set_connect_called(true);
605 self.set_config(config);
606 self.set_url(&strurl);
607 match self.setup_ws(&strurl).await {
608 Ok(_) => (),
609 Err(e) => {
610 return Err(OpenIAPError::ClientError(format!(
611 "Failed to setup WS: {}",
612 e
613 )));
614 }
615 }
616 let client2 = self.clone();
617 tokio::task::spawn(async move {
619 tokio_stream::wrappers::ReceiverStream::new(stream_rx)
620 .for_each(|envelope: Envelope| async {
621 let command = envelope.command.clone();
622 let rid = envelope.rid.clone();
623 let id = envelope.id.clone();
624 trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
625 client2.parse_incomming_envelope(envelope).await;
626 })
627 .await;
628 }); } else {
630 if url.scheme() == "http" {
631 let response = Client::connect_grpc(strurl.clone()).await;
632 match response {
633 Ok(client) => {
634 self.set_client(ClientEnum::Grpc(client));
635 }
636 Err(e) => {
637 return Err(OpenIAPError::ClientError(format!(
638 "Failed to connect: {}",
639 e
640 )));
641 }
642 }
643 } else {
644 let uri = tonic::transport::Uri::builder()
645 .scheme(url.scheme())
646 .authority(url.host().unwrap().to_string())
647 .path_and_query("/")
648 .build();
649 let uri = match uri {
650 Ok(uri) => uri,
651 Err(e) => {
652 return Err(OpenIAPError::ClientError(format!(
653 "Failed to build URI: {}",
654 e
655 )));
656 }
657 };
658 let channel = Channel::builder(uri)
659 .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
660 let channel = match channel {
661 Ok(channel) => channel,
662 Err(e) => {
663 return Err(OpenIAPError::ClientError(format!(
664 "Failed to build channel: {}",
665 e
666 )));
667 }
668 };
669 let channel = channel.connect().await;
670 let channel = match channel {
671 Ok(channel) => channel,
672 Err(e) => {
673 return Err(OpenIAPError::ClientError(format!(
674 "Failed to connect: {}",
675 e
676 )));
677 }
678 };
679 self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
680 }
681 self.set_connect_called(true);
682 self.set_config(config);
683 self.set_url(&strurl);
684 self.setup_grpc_stream().await?;
685 };
686 self.post_connected().await
687 }
688
689 #[tracing::instrument(skip_all)]
718 pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
719 #[cfg(test)]
720 {
721 enable_tracing("openiap=error", "");
722 }
723 let client = Client::new();
724 client.connect_async(dst).await?;
725 Ok(client)
726 }
727 pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
729 if self.get_username().is_empty() && self.get_password().is_empty() {
730 self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
731 self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
732 }
733 if !self.get_username().is_empty() && !self.get_password().is_empty() {
734 debug!("Signing in with username: {}", self.get_username());
735 let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
736 let loginresponse = self.signin(signin).await;
737 match loginresponse {
738 Ok(response) => {
739 self.reset_reconnect_ms();
740 self.set_connected(ClientState::Connected, None);
741 info!("Signed in as {}", response.user.as_ref().unwrap().username);
742 Ok(())
743 }
744 Err(_e) => {
745 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
746 Err(_e)
747 }
748 }
749 } else {
750 self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
751 if self.get_jwt().is_empty() {
752 self.set_jwt(&std::env::var("jwt").unwrap_or_default());
753 }
754 if !self.get_jwt().is_empty() {
755 debug!("Signing in with JWT");
756 let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
757 let loginresponse = self.signin(signin).await;
758 match loginresponse {
759 Ok(response) => match response.user {
760 Some(user) => {
761 self.reset_reconnect_ms();
762 info!("Signed in as {}", user.username);
763 self.set_connected(ClientState::Connected, None);
764 Ok(())
765 }
766 None => {
767 self.reset_reconnect_ms();
768 info!("Signed in as guest");
769 self.set_connected(ClientState::Connected, None);
770 Ok(())
771 }
773 },
774 Err(_e) => {
775 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
776 Err(_e)
777 }
778 }
779 } else {
780 self.reset_reconnect_ms();
781 match self.get_element().await {
782 Ok(_) => {
783 debug!("Connected, No credentials provided so is running as guest");
784 self.set_connected(ClientState::Connected, None);
785 Ok(())
786 },
787 Err(e) => {
788 self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
789 Err(e)
790 }
791 }
792 }
793 }
794 }
795 #[tracing::instrument(skip_all)]
797 pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
798 let state = self.get_state();
799 if state == ClientState::Connected || state == ClientState::Signedin {
800 return Ok(());
801 }
802 if !self.is_auto_reconnect() {
803 return Ok(());
804 }
805 let client = self.get_client();
806
807 match client {
808 ClientEnum::WS(ref _client) => {
809 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
810 self.setup_ws(&self.get_url()).await?;
811 debug!("Completed reconnecting to websocket");
812 self.post_connected().await
813 }
814 ClientEnum::Grpc(ref _client) => {
815 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
816 match self.setup_grpc_stream().await {
817 Ok(_) => {
818 debug!("Completed reconnecting to gRPC");
819 self.post_connected().await
820 },
821 Err(e) => {
822 return Err(OpenIAPError::ClientError(format!(
823 "Failed to setup gRPC stream: {}",
824 e
825 )));
826 }
827 }
828 }
829 ClientEnum::None => {
830 return Err(OpenIAPError::ClientError("Invalid client".to_string()));
831 }
832 }
833 }
834 pub fn disconnect(&self) {
836 self.set_auto_reconnect(false);
837 self.set_connected(ClientState::Disconnected, Some("Disconnected"));
838 }
839 pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
841 {
842 let current = self.get_state();
843 trace!("Set connected: {:?} from {:?}", state, current);
844 if state == ClientState::Connected && current == ClientState::Signedin {
845 self.set_state(ClientState::Signedin);
846 } else {
847 self.set_state(state.clone());
848 }
849 if state == ClientState::Disconnected && !current.eq(&state) {
850 let me = self.clone();
851 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
852 tokio::task::spawn(async move {
853 let mut reply_queue_guard = me.rpc_reply_queue.lock().await;
854 let mut callback_guard = me.rpc_callback.lock().await;
855 *reply_queue_guard = None;
856 *callback_guard = None;
857 });
858 }
859 }
860 if state == ClientState::Connecting && !current.eq(&state) {
861 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
862 self.stats.lock().unwrap().connection_attempts += 1;
863 let me = self.clone();
864 tokio::task::spawn(async move {
865 me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
866 });
867 }
868
869 }
870 if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) {
871 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
872 self.stats.lock().unwrap().connections += 1;
873 let me = self.clone();
874 tokio::task::spawn(async move {
875 me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
876 });
877 }
878 }
879 if state == ClientState::Signedin && current != ClientState::Signedin {
880 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
881 let me = self.clone();
882 tokio::task::spawn(async move {
883 me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
884 });
885 }
886 }
887 if state == ClientState::Disconnected && !current.eq(&state) {
888 if message.is_some() {
889 debug!("Disconnected: {}", message.unwrap());
890 } else {
891 debug!("Disconnected");
892 }
893 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
894 let me = self.clone();
895 let message = match message {
896 Some(message) => message.to_string(),
897 None => "".to_string(),
898 };
899 tokio::task::spawn(async move {
900 me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
901 });
902 }
903
904 self.kill_handles();
905 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
906 let client = self.clone();
907 tokio::task::spawn(async move {
908 {
909 let inner = client.inner.lock().await;
910 let mut queries = inner.queries.lock().await;
911 let ids = queries.keys().cloned().collect::<Vec<String>>();
912 debug!("********************************************** Cleaning up");
913 for id in ids {
914 let err = ErrorResponse {
915 code: 500,
916 message: "Disconnected".to_string(),
917 stack: "".to_string(),
918 };
919 let envelope = err.to_envelope();
920 let tx = queries.remove(&id).unwrap();
921 debug!("kill query: {}", id);
922 let _ = tx.send(envelope);
923 }
924 let mut streams = inner.streams.lock().await;
925 let ids = streams.keys().cloned().collect::<Vec<String>>();
926 for id in ids {
927 let tx = streams.remove(&id).unwrap();
928 debug!("kill stream: {}", id);
929 let _ = tx.send(Vec::new()).await;
930 }
931 let mut queues = inner.queues.lock().await;
932 let ids = queues.keys().cloned().collect::<Vec<String>>();
933 for id in ids {
934 let _ = queues.remove(&id).unwrap();
935 }
936 let mut watches = inner.watches.lock().await;
937 let ids = watches.keys().cloned().collect::<Vec<String>>();
938 for id in ids {
939 let _ = watches.remove(&id).unwrap();
940 }
941 debug!("**********************************************************");
942 }
943 if client.is_auto_reconnect() {
944 trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
945 tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
946 if client.is_auto_reconnect() {
947 client.inc_reconnect_ms();
948 trace!("Reconnecting . . .");
950 client.reconnect().await.unwrap_or_else(|e| {
951 error!("Failed to reconnect: {}", e);
952 client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
953 });
954 } else {
955 debug!("Not reconnecting");
956 }
957 } else {
958 debug!("Reconnecting disabled, stop now");
959 }
960 });
961 }
962
963 }
964 }
965 }
966 pub fn get_state(&self) -> ClientState {
968 let conn = self.state.lock().unwrap();
969 conn.clone()
970 }
971 pub fn set_state(&self, state: ClientState) {
973 let mut conn = self.state.lock().unwrap();
974 *conn = state;
975 }
976 pub fn set_msgcount(&self, msgcount: i32) {
978 let mut current = self.msgcount.lock().unwrap();
979 trace!("Set msgcount: {} from {}", msgcount, *current);
980 *current = msgcount;
981 }
982 pub fn inc_msgcount(&self) -> i32 {
984 let mut current = self.msgcount.lock().unwrap();
985 *current += 1;
986 *current
987 }
988 pub fn get_reconnect_ms(&self) -> i32 {
990 let reconnect_ms = self.reconnect_ms.lock().unwrap();
991 *reconnect_ms
992 }
993 pub fn reset_reconnect_ms(&self) {
995 let mut current = self.reconnect_ms.lock().unwrap();
996 *current = 500;
997 }
998 pub fn inc_reconnect_ms(&self) -> i32 {
1000 let mut current = self.reconnect_ms.lock().unwrap();
1001 if *current < 30000 {
1002 *current += 500;
1003 }
1004 *current
1005 }
1006
1007 pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
1009 let mut handles = self.task_handles.lock().unwrap();
1010 handles.push(handle);
1011 }
1012 pub fn kill_handles(&self) {
1014 let mut handles = self.task_handles.lock().unwrap();
1015 for handle in handles.iter() {
1016 debug!("Killing handle");
1019 if !handle.is_finished() {
1020 handle.abort();
1021 }
1022 }
1023 handles.clear();
1024 }
1031
1032
1033 #[tracing::instrument(skip_all)]
1035 fn get_msgcount(&self) -> i32 {
1036 let msgcount = self.msgcount.lock().unwrap();
1037 *msgcount
1038 }
1039 pub fn set_default_timeout(&self, timeout: Duration) {
1041 let mut current = self.default_timeout.lock().unwrap();
1042 trace!("Set default_timeout: {} from {:?}", timeout.as_secs(), current.as_secs());
1043 *current = timeout;
1044 }
1045 pub fn get_default_timeout(&self) -> Duration {
1047 let current = self.default_timeout.lock().unwrap();
1048 current.clone()
1049 }
1050 #[tracing::instrument(skip_all)]
1052 pub fn set_connect_called(&self, connect_called: bool) {
1053 let mut current = self.connect_called.lock().unwrap();
1054 trace!("Set connect_called: {} from {}", connect_called, *current);
1055 *current = connect_called;
1056 }
1057 #[tracing::instrument(skip_all)]
1059 fn is_connect_called(&self) -> bool {
1060 let connect_called = self.connect_called.lock().unwrap();
1061 *connect_called
1062 }
1063 #[tracing::instrument(skip_all)]
1065 pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1066 let mut current = self.auto_reconnect.lock().unwrap();
1067 trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1068 *current = auto_reconnect;
1069 }
1070 #[tracing::instrument(skip_all)]
1072 fn is_auto_reconnect(&self) -> bool {
1073 let auto_reconnect = self.auto_reconnect.lock().unwrap();
1074 *auto_reconnect
1075 }
1076 #[tracing::instrument(skip_all)]
1078 pub fn set_url(&self, url: &str) {
1079 let mut current = self.url.lock().unwrap();
1080 trace!("Set url: {} from {}", url, *current);
1081 *current = url.to_string();
1082 }
1083 #[tracing::instrument(skip_all)]
1085 fn get_url(&self) -> String {
1086 let url = self.url.lock().unwrap();
1087 url.to_string()
1088 }
1089 #[tracing::instrument(skip_all)]
1091 pub fn set_username(&self, username: &str) {
1092 let mut current = self.username.lock().unwrap();
1093 trace!("Set username: {} from {}", username, *current);
1094 *current = username.to_string();
1095 }
1096 #[tracing::instrument(skip_all)]
1098 fn get_username(&self) -> String {
1099 let username = self.username.lock().unwrap();
1100 username.to_string()
1101 }
1102 #[tracing::instrument(skip_all)]
1104 pub fn set_password(&self, password: &str) {
1105 let mut current = self.password.lock().unwrap();
1106 trace!("Set password: {} from {}", password, *current);
1107 *current = password.to_string();
1108 }
1109 #[tracing::instrument(skip_all)]
1111 fn get_password(&self) -> String {
1112 let password = self.password.lock().unwrap();
1113 password.to_string()
1114 }
1115 #[tracing::instrument(skip_all)]
1117 pub fn set_jwt(&self, jwt: &str) {
1118 let mut current = self.jwt.lock().unwrap();
1119 trace!("Set jwt: {} from {}", jwt, *current);
1120 *current = jwt.to_string();
1121 }
1122 #[tracing::instrument(skip_all)]
1124 fn get_jwt(&self) -> String {
1125 let jwt = self.jwt.lock().unwrap();
1126 jwt.to_string()
1127 }
1128
1129 #[tracing::instrument(skip_all)]
1131 pub fn set_service_name(&self, service_name: &str) {
1132 let mut current = self.service_name.lock().unwrap();
1133 trace!("Set servicename: {} from {}", service_name, *current);
1134 *current = service_name.to_string();
1135 }
1136 #[tracing::instrument(skip_all)]
1138 pub fn get_service_name(&self) -> String {
1139 let servicename = self.service_name.lock().unwrap();
1140 servicename.to_string()
1141 }
1142 #[tracing::instrument(skip_all)]
1144 pub fn set_agent_name(&self, agent: &str) {
1145 let mut current = self.agent_name.lock().unwrap();
1146 trace!("Set agent: {} from {}", agent, *current);
1147 *current = agent.to_string();
1148 }
1149 #[tracing::instrument(skip_all)]
1151 pub fn get_agent_name(&self) -> String {
1152 let agent = self.agent_name.lock().unwrap();
1153 agent.to_string()
1154 }
1155 #[tracing::instrument(skip_all)]
1157 pub fn set_agent_version(&self, version: &str) {
1158 let mut current = self.agent_version.lock().unwrap();
1159 trace!("Set agent version: {} from {}", version, *current);
1160 *current = version.to_string();
1161 }
1162 #[tracing::instrument(skip_all)]
1164 pub fn get_agent_version(&self) -> String {
1165 let agent_version = self.agent_version.lock().unwrap();
1166 agent_version.to_string()
1167 }
1168
1169 #[tracing::instrument(skip_all)]
1171 pub fn set_config(&self, config: Option<Config>) {
1172 let mut current = self.config.lock().unwrap();
1173 *current = config;
1174 }
1175 #[tracing::instrument(skip_all)]
1177 pub fn get_config(&self) -> Option<Config> {
1178 let config = self.config.lock().unwrap();
1179 config.clone()
1180 }
1181 #[tracing::instrument(skip_all)]
1183 pub fn set_client(&self, client: ClientEnum) {
1184 let mut current = self.client.lock().unwrap();
1185 *current = client;
1186 }
1187 #[tracing::instrument(skip_all)]
1189 fn get_client(&self) -> ClientEnum {
1190 let client = self.client.lock().unwrap();
1191 client.clone()
1192 }
1193 #[tracing::instrument(skip_all)]
1195 pub fn set_user(&self, user: Option<User>) {
1196 let mut current = self.user.lock().unwrap();
1197 *current = user;
1198 }
1199 #[tracing::instrument(skip_all)]
1201 pub fn get_user(&self) -> Option<User> {
1202 let user = self.user.lock().unwrap();
1203 user.clone()
1204 }
1205 #[tracing::instrument(skip_all)]
1215 pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1216 let mut current = self.runtime.lock().unwrap();
1217 *current = runtime;
1218 }
1219 #[tracing::instrument(skip_all)]
1221 pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1223 self.runtime.as_ref()
1224 }
1225 #[tracing::instrument(skip_all)]
1227 pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1228 let mut rt = self.runtime.lock().unwrap();
1229 if rt.is_none() {
1230 let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1232 *rt = Some(runtime);
1233 } else {
1234 }
1236 rt.as_ref().unwrap().handle().clone()
1237 }
1238 #[tracing::instrument(skip_all)]
1240 pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1241 {
1242 let event_receiver = self.event_receiver.clone();
1244 let callback = callback;
1245 let _handle = tokio::task::spawn(async move {
1246 while let Ok(event) = event_receiver.recv().await {
1247 callback(event);
1248 }
1249 }); }
1251 #[tracing::instrument(skip_all)]
1253 pub fn get_uniqueid() -> String {
1254 static COUNTER: AtomicUsize = AtomicUsize::new(1);
1255 let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1256 let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1257 let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1258 let sqids = Sqids::default();
1259 sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1260 }
1261 #[tracing::instrument(skip_all)]
1263 async fn send(&self, msg: Envelope, timeout: Option<tokio::time::Duration>) -> Result<Envelope, OpenIAPError> {
1264 let response = self.send_noawait(msg).await;
1265 match response {
1266 Ok((response_rx, id)) => {
1267 let timeout = match timeout {
1268 Some(t) => t,
1269 None => self.get_default_timeout()
1270 };
1271 let result = tokio::time::timeout(timeout, response_rx).await;
1272 let inner = self.inner.lock().await;
1274 inner.queries.lock().await.remove(&id);
1275
1276 match result {
1277 Ok(Ok(response)) => Ok(response),
1278 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
1279 Err(_) => Err(OpenIAPError::ClientError("Request timed out".to_string())),
1280 }
1281 }
1294 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1295 }
1296 }
1297 #[tracing::instrument(skip_all)]
1300 async fn send_noawait(
1301 &self,
1302 mut msg: Envelope,
1303 ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1304 let (response_tx, response_rx) = oneshot::channel();
1305 let id = Client::get_uniqueid();
1306 msg.id = id.clone();
1307
1308 {
1310 let inner = self.inner.lock().await;
1311 inner.queries.lock().await.insert(id.clone(), response_tx);
1312 }
1313
1314 let res = self.send_envelope(msg).await;
1316 if let Err(e) = res {
1317 let inner = self.inner.lock().await;
1319 inner.queries.lock().await.remove(&id);
1320 return Err(OpenIAPError::ClientError(e.to_string()));
1321 }
1322
1323 Ok((response_rx, id))
1324 }
1325 #[tracing::instrument(skip_all)]
1327 async fn sendwithstream(
1328 &self,
1329 mut msg: Envelope,
1330 ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1331 let (response_tx, response_rx) = oneshot::channel();
1332 let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1333 let id = Client::get_uniqueid();
1334 msg.id = id.clone();
1335 {
1336 let inner = self.inner.lock().await;
1337 inner.queries.lock().await.insert(id.clone(), response_tx);
1338 inner.streams.lock().await.insert(id.clone(), stream_tx);
1339 let res = self.send_envelope(msg).await;
1340 match res {
1341 Ok(_) => (),
1342 Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1343 }
1344 }
1345 Ok((response_rx, stream_rx))
1346 }
1347 #[tracing::instrument(skip_all, target = "openiap::client")]
1348 async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1349 if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin )
1350 && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1351 return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1352 }
1353 let env = envelope.clone();
1354 let command = envelope.command.clone();
1355 self.stats.lock().unwrap().package_tx += 1;
1356 match command.as_str() {
1357 "signin" => { self.stats.lock().unwrap().signin += 1;},
1358 "upload" => { self.stats.lock().unwrap().upload += 1;},
1359 "download" => { self.stats.lock().unwrap().download += 1;},
1360 "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1361 "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1362 "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1363 "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1364 "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1365 "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1366 "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1367
1368 "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1369 "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1370 "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1371 "watch" => { self.stats.lock().unwrap().watch += 1;},
1372 "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1373 "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1374
1375 "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1376 "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1377 "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1378 "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1379 "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1380 "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1381 "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1382 "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1383
1384 "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1385 "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1386 "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1387 "query" => { self.stats.lock().unwrap().query += 1;},
1388 "count" => { self.stats.lock().unwrap().count += 1;},
1389 "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1390 "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1391 "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1392 "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1393 "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1394 "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1395 "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1396 "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1397 "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1398 "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1399 _ => {}
1400 };
1401 if envelope.id.is_empty() {
1402 let id = Client::get_uniqueid();
1403 envelope.id = id.clone();
1404 }
1405 trace!("Sending {} message, in the thread", command);
1406 let res = self.out_envelope_sender.send(env).await;
1407 if res.is_err() {
1408 error!("{:?}", res);
1409 let errmsg = res.unwrap_err().to_string();
1410 self.set_connected(ClientState::Disconnected, Some(&errmsg));
1411 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1412 } else {
1413 return Ok(())
1414 }
1415 }
1416 #[tracing::instrument(skip_all, target = "openiap::client")]
1417 async fn parse_incomming_envelope(&self, received: Envelope) {
1418 self.stats.lock().unwrap().package_rx += 1;
1419 let command = received.command.clone();
1420 trace!("parse_incomming_envelope, command: {}", command);
1421 let inner = self.inner.lock().await;
1422 let rid = received.rid.clone();
1423 let mut queries = inner.queries.lock().await;
1424 let mut streams = inner.streams.lock().await;
1425 let watches = inner.watches.lock().await;
1426 let queues = inner.queues.lock().await;
1427
1428 if command != "ping" && command != "pong" && command != "refreshtoken" {
1429 if rid.is_empty() {
1430 debug!("Received #{} #{} {} message", received.seq, received.id, command);
1431 } else {
1432 debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1433 }
1434 } else if rid.is_empty() {
1435 trace!("Received #{} #{} {} message", received.seq, received.id, command);
1436 } else {
1437 trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1438 }
1439
1440 if command == "ping" {
1441 self.pong(&received.id).await;
1442 } else if command == "refreshtoken" {
1444 } else if command == "beginstream"
1446 || command == "stream"
1447 || command == "endstream"
1448 {
1449 let streamresponse: Stream =
1450 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1451 let streamdata = streamresponse.data;
1452 if !streamdata.is_empty() {
1453 let stream = streams.get(rid.as_str()).unwrap();
1454
1455 match stream.send(streamdata).await {
1456 Ok(_) => _ = (),
1457 Err(e) => error!("Failed to send data: {}", e),
1458 }
1459 }
1460 if command == "endstream" {
1461 let _ = streams.remove(rid.as_str());
1462 }
1463 } else if command == "watchevent" {
1464 let watchevent: WatchEvent =
1465 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1466 if let Some(callback) = watches.get(watchevent.id.as_str()) {
1467 callback(watchevent);
1468 }
1469 } else if command == "queueevent" {
1470 let queueevent: QueueEvent =
1471 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1472 if let Some(callback) = queues.get(queueevent.queuename.as_str()).cloned() {
1473 let queuename = queueevent.replyto.clone();
1474 let correlation_id = queueevent.correlation_id.clone();
1475 let me = self.clone();
1476 tokio::spawn(async move {
1477 let result_fut = callback(Arc::new(me.clone()), queueevent);
1478 let result = result_fut.await;
1479 if result.is_some() && !queuename.is_empty() {
1480 debug!("Sending return value from queue event callback to {}", queuename);
1481 let result = result.unwrap();
1482 let q = QueueMessageRequest {
1483 queuename,
1484 correlation_id,
1485 data: result,
1486 striptoken: true,
1487 ..Default::default()
1488 };
1489 let e = q.to_envelope();
1490 let send_result = me.send(e, None).await;
1491 if let Err(e) = send_result {
1492 error!("Failed to send queue event response: {}", e);
1493 }
1494 }
1495 });
1496 }
1497 } else if let Some(response_tx) = queries.remove(&rid) {
1498 let stream = streams.get(rid.as_str());
1499 if let Some(stream) = stream {
1500 let streamdata = vec![];
1501 match stream.send(streamdata).await {
1502 Ok(_) => _ = (),
1503 Err(e) => error!("Failed to send data: {}", e),
1504 }
1505 }
1506 let _ = response_tx.send(received);
1507 } else {
1508 error!("Received unhandled {} message: {:?}", command, received);
1509 }
1510 }
1511 #[tracing::instrument(skip_all)]
1513 async fn get_element(&self) -> Result<(), OpenIAPError> {
1514 let id = Client::get_uniqueid();
1515 let envelope = Envelope {
1516 id: id.clone(),
1517 command: "getelement".into(),
1518 ..Default::default()
1519 };
1520 let result = match self.send(envelope, None).await {
1521 Ok(res) => res,
1522 Err(e) => {
1523 return Err(e);
1524 },
1525 };
1526 if result.command == "pong" || result.command == "getelement" {
1527 Ok(())
1528 } else if result.command == "error" {
1529 let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1530 Err(OpenIAPError::ServerError(e.message))
1531 } else {
1532 Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1533 }
1534 }
1535 #[tracing::instrument(skip_all)]
1537 async fn ping(&self) -> Result<(), OpenIAPError> {
1538 let id = Client::get_uniqueid();
1539 let envelope = Envelope {
1540 id: id.clone(),
1541 command: "getelement".into(),
1542 ..Default::default()
1543 };
1544 match self.send_envelope(envelope).await {
1545 Ok(_res) => Ok(()),
1546 Err(e) => {
1547 return Err(e);
1548 },
1549 }
1550 }
1551 #[tracing::instrument(skip_all)]
1553 async fn pong(&self, rid: &str) {
1554 let id = Client::get_uniqueid();
1555 let envelope = Envelope {
1556 id: id.clone(),
1557 command: "pong".into(),
1558 rid: rid.to_string(),
1559 ..Default::default()
1560 };
1561 match self.send_envelope(envelope).await {
1562 Ok(_) => (),
1563 Err(e) => error!("Failed to send pong: {}", e),
1564 }
1565 }
1566 #[tracing::instrument(skip_all)]
1572 pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1573 if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1575 if config.jwt.is_empty() {
1576 config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1577 }
1578 if config.jwt.is_empty() {
1579 config.jwt = std::env::var("jwt").unwrap_or_default();
1580 }
1581 if config.jwt.is_empty() {
1583 if config.username.is_empty() {
1584 config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1585 }
1586 if config.password.is_empty() {
1587 config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1588 }
1589 }
1590 }
1591 let version = env!("CARGO_PKG_VERSION");
1592 if !version.is_empty() && config.version.is_empty() {
1593 config.version = version.to_string();
1594 }
1595 if config.agent.is_empty() {
1596 config.agent = self.get_agent_name();
1597 }
1598
1599 let envelope = config.to_envelope();
1601 let result = self.send(envelope, None).await;
1602
1603 match &result {
1604 Ok(m) => {
1605 debug!("Sign-in reply received");
1606 if m.command == "error" {
1607 let e: ErrorResponse =
1608 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1609 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1610 return Err(OpenIAPError::ServerError(e.message));
1611 }
1612 debug!("Sign-in successful");
1613 let response: SigninResponse =
1614 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1615 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1616 if !config.validateonly {
1617 self.set_connected(ClientState::Signedin, None);
1618 self.set_user(Some(response.user.as_ref().unwrap().clone()));
1619 }
1620 Ok(response)
1621 }
1622 Err(e) => {
1623 debug!("Sending Sign-in request failed {:?}", result);
1624 debug!("Sign-in failed: {}", e.to_string());
1625 if !config.validateonly {
1626 self.set_user(None);
1627 }
1628 Err(OpenIAPError::ClientError(e.to_string()))
1629 }
1630 }
1631 }
1632 #[tracing::instrument(skip_all)]
1636 pub async fn list_collections(&self, includehist: bool, env: EnvConfig) -> Result<String, OpenIAPError> {
1637 let config = ListCollectionsRequest::new(includehist);
1638 let mut envelope = config.to_envelope();
1639 if !env.jwt.is_empty() {
1640 envelope.jwt = env.jwt;
1641 }
1642 if !env.spanid.is_empty() {
1643 envelope.spanid = env.spanid;
1644 }
1645 if !env.traceid.is_empty() {
1646 envelope.traceid = env.traceid;
1647 }
1648 let result = self.send(envelope, None).await;
1649 match result {
1650 Ok(m) => {
1651 let data = match m.data {
1652 Some(data) => data,
1653 None => {
1654 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1655 }
1656 };
1657 if m.command == "error" {
1658 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1659 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1660 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1661 }
1662 let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1663 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1664 Ok(response.results)
1665 }
1666 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1667 }
1668 }
1669 #[tracing::instrument(skip_all)]
1721 pub async fn create_collection(
1722 &self,
1723 config: CreateCollectionRequest,
1724 env: EnvConfig,
1725 ) -> Result<(), OpenIAPError> {
1726 if config.collectionname.is_empty() {
1727 return Err(OpenIAPError::ClientError(
1728 "No collection name provided".to_string(),
1729 ));
1730 }
1731 let mut envelope = config.to_envelope();
1732 if !env.jwt.is_empty() {
1733 envelope.jwt = env.jwt;
1734 }
1735 if !env.spanid.is_empty() {
1736 envelope.spanid = env.spanid;
1737 }
1738 if !env.traceid.is_empty() {
1739 envelope.traceid = env.traceid;
1740 }
1741 let result = self.send(envelope, None).await;
1742 match result {
1743 Ok(m) => {
1744 let data = match m.data {
1745 Some(data) => data,
1746 None => {
1747 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1748 }
1749 };
1750 if m.command == "error" {
1751 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1752 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1753 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1754 }
1755 Ok(())
1756 }
1757 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1758 }
1759 }
1760 #[tracing::instrument(skip_all)]
1763 pub async fn drop_collection(&self, config: DropCollectionRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1764 if config.collectionname.is_empty() {
1765 return Err(OpenIAPError::ClientError(
1766 "No collection name provided".to_string(),
1767 ));
1768 }
1769 let mut envelope = config.to_envelope();
1770 if !env.jwt.is_empty() {
1771 envelope.jwt = env.jwt;
1772 }
1773 if !env.spanid.is_empty() {
1774 envelope.spanid = env.spanid;
1775 }
1776 if !env.traceid.is_empty() {
1777 envelope.traceid = env.traceid;
1778 }
1779 let result = self.send(envelope, None).await;
1780 match result {
1781 Ok(m) => {
1782 let data = match m.data {
1783 Some(data) => data,
1784 None => {
1785 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1786 }
1787 };
1788 if m.command == "error" {
1789 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1790 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1791 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1792 }
1793 Ok(())
1794 }
1795 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1796 }
1797 }
1798 pub async fn get_indexes(&self, config: GetIndexesRequest, env: EnvConfig) -> Result<String, OpenIAPError> {
1812 if config.collectionname.is_empty() {
1813 return Err(OpenIAPError::ClientError(
1814 "No collection name provided".to_string(),
1815 ));
1816 }
1817 let mut envelope = config.to_envelope();
1818 if !env.jwt.is_empty() {
1819 envelope.jwt = env.jwt;
1820 }
1821 if !env.spanid.is_empty() {
1822 envelope.spanid = env.spanid;
1823 }
1824 if !env.traceid.is_empty() {
1825 envelope.traceid = env.traceid;
1826 }
1827 let result = self.send(envelope, None).await;
1828 match result {
1829 Ok(m) => {
1830 let data = match m.data {
1831 Some(data) => data,
1832 None => {
1833 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1834 }
1835 };
1836 if m.command == "error" {
1837 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1838 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1839 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1840 }
1841 let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1842 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1843 Ok(response.results)
1844 }
1845 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1846 }
1847 }
1848 pub async fn create_index(&self, config: CreateIndexRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1889 if config.collectionname.is_empty() {
1890 return Err(OpenIAPError::ClientError(
1891 "No collection name provided".to_string(),
1892 ));
1893 }
1894 if config.index.is_empty() {
1895 return Err(OpenIAPError::ClientError(
1896 "No index was provided".to_string(),
1897 ));
1898 }
1899 let mut envelope = config.to_envelope();
1900 if !env.jwt.is_empty() {
1901 envelope.jwt = env.jwt;
1902 }
1903 if !env.spanid.is_empty() {
1904 envelope.spanid = env.spanid;
1905 }
1906 if !env.traceid.is_empty() {
1907 envelope.traceid = env.traceid;
1908 }
1909 let result = self.send(envelope, None).await;
1910 match result {
1911 Ok(m) => {
1912 let data = match m.data {
1913 Some(data) => data,
1914 None => {
1915 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1916 }
1917 };
1918 if m.command == "error" {
1919 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1920 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1921 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1922 }
1923 Ok(())
1924 }
1925 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1926 }
1927 }
1928 pub async fn drop_index(&self, config: DropIndexRequest, env: EnvConfig) -> Result<(), OpenIAPError> {
1931 if config.collectionname.is_empty() {
1932 return Err(OpenIAPError::ClientError(
1933 "No collection name provided".to_string(),
1934 ));
1935 }
1936 if config.name.is_empty() {
1937 return Err(OpenIAPError::ClientError(
1938 "No index name provided".to_string(),
1939 ));
1940 }
1941 let mut envelope = config.to_envelope();
1942 if !env.jwt.is_empty() {
1943 envelope.jwt = env.jwt;
1944 }
1945 if !env.spanid.is_empty() {
1946 envelope.spanid = env.spanid;
1947 }
1948 if !env.traceid.is_empty() {
1949 envelope.traceid = env.traceid;
1950 }
1951 let result = self.send(envelope, None).await;
1952 match result {
1953 Ok(m) => {
1954 let data = match m.data {
1955 Some(data) => data,
1956 None => {
1957 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1958 }
1959 };
1960 if m.command == "error" {
1961 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1962 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1963 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1964 }
1965 Ok(())
1966 }
1967 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1968 }
1969 }
1970 #[tracing::instrument(skip_all)]
2008 pub async fn query(&self, mut config: QueryRequest, env: EnvConfig) -> Result<QueryResponse, OpenIAPError> {
2009 if config.collectionname.is_empty() {
2010 config.collectionname = "entities".to_string();
2011 }
2012 let mut envelope = config.to_envelope();
2013 if !env.jwt.is_empty() {
2014 envelope.jwt = env.jwt;
2015 }
2016 if !env.spanid.is_empty() {
2017 envelope.spanid = env.spanid;
2018 }
2019 if !env.traceid.is_empty() {
2020 envelope.traceid = env.traceid;
2021 }
2022 let result = self.send(envelope, None).await;
2023 match result {
2024 Ok(m) => {
2025 let data = match m.data {
2026 Some(data) => data,
2027 None => {
2028 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2029 }
2030 };
2031 if m.command == "error" {
2032 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2033 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2034 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2035 }
2036 let response: QueryResponse = prost::Message::decode(data.value.as_ref())
2037 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2038 debug!("Return Ok(response)");
2039 Ok(response)
2040 }
2041 Err(e) => {
2042 debug!("Error !!");
2043 Err(OpenIAPError::ClientError(e.to_string()))
2044 }
2045 }
2046 }
2047 #[tracing::instrument(skip_all)]
2073 pub async fn get_one(&self, mut config: QueryRequest, env: EnvConfig) -> Option<serde_json::Value> {
2074 if config.collectionname.is_empty() {
2075 config.collectionname = "entities".to_string();
2076 }
2077 config.top = 1;
2078 let mut envelope = config.to_envelope();
2079 if !env.jwt.is_empty() {
2080 envelope.jwt = env.jwt;
2081 }
2082 if !env.spanid.is_empty() {
2083 envelope.spanid = env.spanid;
2084 }
2085 if !env.traceid.is_empty() {
2086 envelope.traceid = env.traceid;
2087 }
2088 let result = self.send(envelope, None).await;
2089 match result {
2090 Ok(m) => {
2091 let data = match m.data {
2092 Some(data) => data,
2093 None => return None,
2094 };
2095 if m.command == "error" {
2096 return None;
2097 }
2098 let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
2099
2100 let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
2101 let items: &Vec<serde_json::Value> = items.as_array().unwrap();
2102 if items.is_empty() {
2103 return None;
2104 }
2105 let item = items[0].clone();
2106 Some(item)
2107 }
2108 Err(_) => None,
2109 }
2110 }
2111
2112 #[tracing::instrument(skip_all)]
2222 pub async fn get_document_version(
2223 &self,
2224 mut config: GetDocumentVersionRequest,
2225 env: EnvConfig,
2226 ) -> Result<String, OpenIAPError> {
2227 if config.collectionname.is_empty() {
2228 config.collectionname = "entities".to_string();
2229 }
2230 if config.id.is_empty() {
2231 return Err(OpenIAPError::ClientError("No id provided".to_string()));
2232 }
2233 let mut envelope = config.to_envelope();
2234 if !env.jwt.is_empty() {
2235 envelope.jwt = env.jwt;
2236 }
2237 if !env.spanid.is_empty() {
2238 envelope.spanid = env.spanid;
2239 }
2240 if !env.traceid.is_empty() {
2241 envelope.traceid = env.traceid;
2242 }
2243 let result = self.send(envelope, None).await;
2244 match result {
2245 Ok(m) => {
2246 let data = match m.data {
2247 Some(data) => data,
2248 None => {
2249 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2250 }
2251 };
2252 if m.command == "error" {
2253 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2254 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2255 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2256 }
2257 let response: GetDocumentVersionResponse =
2258 prost::Message::decode(data.value.as_ref())
2259 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2260 Ok(response.result)
2261 }
2262 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2263 }
2264 }
2265 #[tracing::instrument(skip_all)]
2284 pub async fn aggregate(
2285 &self,
2286 mut config: AggregateRequest,
2287 env: EnvConfig,
2288 ) -> Result<AggregateResponse, OpenIAPError> {
2289 if config.collectionname.is_empty() {
2290 config.collectionname = "entities".to_string();
2291 }
2292 if config.hint.is_empty() {
2293 config.hint = "".to_string();
2294 }
2295 if config.queryas.is_empty() {
2296 config.queryas = "".to_string();
2297 }
2298 if config.aggregates.is_empty() {
2299 return Err(OpenIAPError::ClientError(
2300 "No aggregates provided".to_string(),
2301 ));
2302 }
2303 let mut envelope = config.to_envelope();
2304 if !env.jwt.is_empty() {
2305 envelope.jwt = env.jwt;
2306 }
2307 if !env.spanid.is_empty() {
2308 envelope.spanid = env.spanid;
2309 }
2310 if !env.traceid.is_empty() {
2311 envelope.traceid = env.traceid;
2312 }
2313 let result = self.send(envelope, None).await;
2314 match result {
2315 Ok(m) => {
2316 let data = match m.data {
2317 Some(data) => data,
2318 None => {
2319 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2320 }
2321 };
2322 if m.command == "error" {
2323 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2324 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2325 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2326 }
2327 let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2328 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2329 Ok(response)
2330 }
2331 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2332 }
2333 }
2334 #[tracing::instrument(skip_all)]
2336 pub async fn count(&self, mut config: CountRequest, env: EnvConfig) -> Result<CountResponse, OpenIAPError> {
2337 if config.collectionname.is_empty() {
2338 config.collectionname = "entities".to_string();
2339 }
2340 if config.query.is_empty() {
2341 config.query = "{}".to_string();
2342 }
2343 let mut envelope = config.to_envelope();
2344 if !env.jwt.is_empty() {
2345 envelope.jwt = env.jwt;
2346 }
2347 if !env.spanid.is_empty() {
2348 envelope.spanid = env.spanid;
2349 }
2350 if !env.traceid.is_empty() {
2351 envelope.traceid = env.traceid;
2352 }
2353 let result = self.send(envelope, None).await;
2354 match result {
2355 Ok(m) => {
2356 let data = match m.data {
2357 Some(data) => data,
2358 None => {
2359 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2360 }
2361 };
2362 if m.command == "error" {
2363 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2364 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2365 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2366 }
2367 let response: CountResponse = prost::Message::decode(data.value.as_ref())
2368 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2369 Ok(response)
2370 }
2371 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2372 }
2373 }
2374 #[tracing::instrument(skip_all)]
2376 pub async fn distinct(
2377 &self,
2378 mut config: DistinctRequest,
2379 env: EnvConfig,
2380 ) -> Result<DistinctResponse, OpenIAPError> {
2381 if config.collectionname.is_empty() {
2382 config.collectionname = "entities".to_string();
2383 }
2384 if config.query.is_empty() {
2385 config.query = "{}".to_string();
2386 }
2387 if config.field.is_empty() {
2388 return Err(OpenIAPError::ClientError("No field provided".to_string()));
2389 }
2390 let mut envelope = config.to_envelope();
2391 if !env.jwt.is_empty() {
2392 envelope.jwt = env.jwt;
2393 }
2394 if !env.spanid.is_empty() {
2395 envelope.spanid = env.spanid;
2396 }
2397 if !env.traceid.is_empty() {
2398 envelope.traceid = env.traceid;
2399 }
2400 let result = self.send(envelope, None).await;
2401 match result {
2402 Ok(m) => {
2403 let data = match m.data {
2404 Some(data) => data,
2405 None => {
2406 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2407 }
2408 };
2409 if m.command == "error" {
2410 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2411 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2412 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2413 }
2414 let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2415 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2416 Ok(response)
2417 }
2418 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2419 }
2420 }
2421 #[tracing::instrument(skip_all)]
2423 pub async fn insert_one(
2424 &self,
2425 config: InsertOneRequest,
2426 env: EnvConfig,
2427 ) -> Result<InsertOneResponse, OpenIAPError> {
2428 let mut envelope = config.to_envelope();
2429 if !env.jwt.is_empty() {
2430 envelope.jwt = env.jwt;
2431 }
2432 if !env.spanid.is_empty() {
2433 envelope.spanid = env.spanid;
2434 }
2435 if !env.traceid.is_empty() {
2436 envelope.traceid = env.traceid;
2437 }
2438 let result = self.send(envelope, None).await;
2439 match result {
2440 Ok(m) => {
2441 let data = match m.data {
2442 Some(data) => data,
2443 None => {
2444 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2445 }
2446 };
2447 if m.command == "error" {
2448 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2449 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2450 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2451 }
2452 let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2453 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2454 Ok(response)
2455 }
2456 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2457 }
2458 }
2459 #[tracing::instrument(skip_all)]
2461 pub async fn insert_many(
2462 &self,
2463 config: InsertManyRequest,
2464 env: EnvConfig,
2465 ) -> Result<InsertManyResponse, OpenIAPError> {
2466 let mut envelope = config.to_envelope();
2467 if !env.jwt.is_empty() {
2468 envelope.jwt = env.jwt;
2469 }
2470 if !env.spanid.is_empty() {
2471 envelope.spanid = env.spanid;
2472 }
2473 if !env.traceid.is_empty() {
2474 envelope.traceid = env.traceid;
2475 }
2476 let result = self.send(envelope, None).await;
2477 match result {
2478 Ok(m) => {
2479 let data = match m.data {
2480 Some(data) => data,
2481 None => {
2482 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2483 }
2484 };
2485 if m.command == "error" {
2486 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2487 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2488 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2489 }
2490 let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2491 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2492 Ok(response)
2493 }
2494 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2495 }
2496 }
2497 #[tracing::instrument(skip_all)]
2499 pub async fn update_one(
2500 &self,
2501 config: UpdateOneRequest,
2502 env: EnvConfig,
2503 ) -> Result<UpdateOneResponse, OpenIAPError> {
2504 let mut envelope = config.to_envelope();
2505 if !env.jwt.is_empty() {
2506 envelope.jwt = env.jwt;
2507 }
2508 if !env.spanid.is_empty() {
2509 envelope.spanid = env.spanid;
2510 }
2511 if !env.traceid.is_empty() {
2512 envelope.traceid = env.traceid;
2513 }
2514 let result = self.send(envelope, None).await;
2515 match result {
2516 Ok(m) => {
2517 let data = match m.data {
2518 Some(data) => data,
2519 None => {
2520 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2521 }
2522 };
2523 if m.command == "error" {
2524 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2525 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2526 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2527 }
2528 let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2529 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2530 Ok(response)
2531 }
2532 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2533 }
2534 }
2535 #[tracing::instrument(skip_all)]
2537 pub async fn insert_or_update_one(
2538 &self,
2539 config: InsertOrUpdateOneRequest,
2540 env: EnvConfig,
2541 ) -> Result<String, OpenIAPError> {
2542 let mut envelope = config.to_envelope();
2543 if !env.jwt.is_empty() {
2544 envelope.jwt = env.jwt;
2545 }
2546 if !env.spanid.is_empty() {
2547 envelope.spanid = env.spanid;
2548 }
2549 if !env.traceid.is_empty() {
2550 envelope.traceid = env.traceid;
2551 }
2552 let result = self.send(envelope, None).await;
2553 match result {
2554 Ok(m) => {
2555 let data = match m.data {
2556 Some(data) => data,
2557 None => {
2558 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2559 }
2560 };
2561 if m.command == "error" {
2562 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2563 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2564 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2565 }
2566 let response: InsertOrUpdateOneResponse =
2567 prost::Message::decode(data.value.as_ref())
2568 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2569 Ok(response.result)
2570 }
2571 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2572 }
2573 }
2574 #[tracing::instrument(skip_all)]
2576 pub async fn insert_or_update_many(
2577 &self,
2578 config: InsertOrUpdateManyRequest,
2579 env: EnvConfig,
2580 ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2581 let mut envelope = config.to_envelope();
2582 if !env.jwt.is_empty() {
2583 envelope.jwt = env.jwt;
2584 }
2585 if !env.spanid.is_empty() {
2586 envelope.spanid = env.spanid;
2587 }
2588 if !env.traceid.is_empty() {
2589 envelope.traceid = env.traceid;
2590 }
2591 let result = self.send(envelope, None).await;
2592 match result {
2593 Ok(m) => {
2594 let data = match m.data {
2595 Some(data) => data,
2596 None => {
2597 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2598 }
2599 };
2600 if m.command == "error" {
2601 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2602 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2603 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2604 }
2605 let response: InsertOrUpdateManyResponse =
2606 prost::Message::decode(data.value.as_ref())
2607 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2608 Ok(response)
2609 }
2610 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2611 }
2612 }
2613 #[tracing::instrument(skip_all)]
2615 pub async fn update_document(
2616 &self,
2617 config: UpdateDocumentRequest,
2618 env: EnvConfig,
2619 ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2620 let mut envelope = config.to_envelope();
2621 if !env.jwt.is_empty() {
2622 envelope.jwt = env.jwt;
2623 }
2624 if !env.spanid.is_empty() {
2625 envelope.spanid = env.spanid;
2626 }
2627 if !env.traceid.is_empty() {
2628 envelope.traceid = env.traceid;
2629 }
2630 let result = self.send(envelope, None).await;
2631 match result {
2632 Ok(m) => {
2633 let data = match m.data {
2634 Some(data) => data,
2635 None => {
2636 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2637 }
2638 };
2639 if m.command == "error" {
2640 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2641 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2642 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2643 }
2644 let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2645 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2646 Ok(response)
2647 }
2648 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2649 }
2650 }
2651 #[tracing::instrument(skip_all)]
2653 pub async fn delete_one(&self, config: DeleteOneRequest, env: EnvConfig) -> Result<i32, OpenIAPError> {
2654 let mut envelope = config.to_envelope();
2655 if !env.jwt.is_empty() {
2656 envelope.jwt = env.jwt;
2657 }
2658 if !env.spanid.is_empty() {
2659 envelope.spanid = env.spanid;
2660 }
2661 if !env.traceid.is_empty() {
2662 envelope.traceid = env.traceid;
2663 }
2664 let result = self.send(envelope, None).await;
2665 match result {
2666 Ok(m) => {
2667 let data = match m.data {
2668 Some(data) => data,
2669 None => {
2670 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2671 }
2672 };
2673 if m.command == "error" {
2674 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2675 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2676 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2677 }
2678 let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2679 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2680 Ok(response.affectedrows)
2681 }
2682 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2683 }
2684 }
2685 #[tracing::instrument(skip_all)]
2687 pub async fn delete_many(&self, config: DeleteManyRequest, env: EnvConfig) -> Result<i32, OpenIAPError> {
2688 let mut envelope = config.to_envelope();
2689 if !env.jwt.is_empty() {
2690 envelope.jwt = env.jwt;
2691 }
2692 if !env.spanid.is_empty() {
2693 envelope.spanid = env.spanid;
2694 }
2695 if !env.traceid.is_empty() {
2696 envelope.traceid = env.traceid;
2697 }
2698 let result = self.send(envelope, None).await;
2699 match result {
2700 Ok(m) => {
2701 let data = match m.data {
2702 Some(data) => data,
2703 None => {
2704 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2705 }
2706 };
2707 if m.command == "error" {
2708 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2709 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2710 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2711 }
2712 let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2713 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2714 Ok(response.affectedrows)
2715 }
2716 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2717 }
2718 }
2719 #[tracing::instrument(skip_all)]
2721 pub async fn download(
2722 &self,
2723 config: DownloadRequest,
2724 env: EnvConfig,
2725 folder: Option<&str>,
2726 filename: Option<&str>,
2727 ) -> Result<DownloadResponse, OpenIAPError> {
2728 let mut envelope = config.to_envelope();
2729 if !env.jwt.is_empty() {
2730 envelope.jwt = env.jwt;
2731 }
2732 if !env.spanid.is_empty() {
2733 envelope.spanid = env.spanid;
2734 }
2735 if !env.traceid.is_empty() {
2736 envelope.traceid = env.traceid;
2737 }
2738 match self.sendwithstream(envelope).await {
2739 Ok((response_rx, mut stream_rx)) => {
2740 let temp_file_path = util::generate_unique_filename("openiap");
2741 debug!("Temp file: {:?}", temp_file_path);
2742 let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2743 OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2744 })?;
2745 while !stream_rx.is_closed() {
2746 match stream_rx.recv().await {
2747 Some(received) => {
2748 if received.is_empty() {
2749 debug!("Stream closed");
2750 break;
2751 }
2752 debug!("Received {} bytes", received.len());
2753 temp_file.write_all(&received).map_err(|e| {
2754 OpenIAPError::ClientError(format!(
2755 "Failed to write to temp file: {}",
2756 e
2757 ))
2758 })?;
2759 }
2760 None => {
2761 debug!("Stream closed");
2762 break;
2763 }
2764 }
2765 }
2766 temp_file.sync_all().map_err(|e| {
2767 OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2768 })?;
2769
2770 let response = response_rx.await.map_err(|_| {
2771 OpenIAPError::ClientError("Failed to receive response".to_string())
2772 })?;
2773
2774 if response.command == "error" {
2775 let data = match response.data {
2776 Some(data) => data,
2777 None => {
2778 return Err(OpenIAPError::ClientError(
2779 "No data returned for SERVER error".to_string(),
2780 ));
2781 }
2782 };
2783 let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2784 return Err(OpenIAPError::ServerError(e.message));
2785 }
2786 let mut downloadresponse: DownloadResponse =
2787 prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2788
2789 let mut final_filename = match &filename {
2790 Some(f) => f,
2791 None => downloadresponse.filename.as_str(),
2792 };
2793 if final_filename.is_empty() {
2794 final_filename = downloadresponse.filename.as_str();
2795 }
2796 let mut folder = match &folder {
2797 Some(f) => f,
2798 None => ".",
2799 };
2800 if folder.is_empty() {
2801 folder = ".";
2802 }
2803 let filepath = format!("{}/{}", folder, final_filename);
2804 trace!("Moving file to {}", filepath);
2805 util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2806 OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2807 })?;
2808 debug!("Downloaded file to {}", filepath);
2809 downloadresponse.filename = filepath;
2810
2811 Ok(downloadresponse)
2812 }
2813 Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2814 }
2815 }
2816 #[tracing::instrument(skip_all)]
2818 pub async fn upload(
2819 &self,
2820 config: UploadRequest,
2821 env: EnvConfig,
2822 filepath: &str,
2823 ) -> Result<UploadResponse, OpenIAPError> {
2824 debug!("upload: Uploading file: {}", filepath);
2883 let mut file = File::open(filepath)
2884 .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2885 let chunk_size = 1024 * 1024;
2886 let mut buffer = vec![0; chunk_size];
2887
2888 let mut envelope = config.to_envelope();
2890 if !env.jwt.is_empty() {
2891 envelope.jwt = env.jwt;
2892 }
2893 if !env.spanid.is_empty() {
2894 envelope.spanid = env.spanid;
2895 }
2896 if !env.traceid.is_empty() {
2897 envelope.traceid = env.traceid;
2898 }
2899 let (response_rx, rid) = self.send_noawait(envelope).await?;
2900
2901 let envelope = BeginStream::from_rid(rid.clone());
2903 debug!("Sending beginstream to #{}", rid);
2904 if let Err(e) = self.send_envelope(envelope).await {
2905 let inner = self.inner.lock().await;
2906 inner.queries.lock().await.remove(&rid);
2907 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2908 }
2909
2910 let mut counter = 0;
2912 loop {
2913 let bytes_read = file.read(&mut buffer).map_err(|e| {
2914 OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2915 })?;
2916 counter += 1;
2917
2918 if bytes_read == 0 {
2919 break;
2920 }
2921
2922 let chunk = buffer[..bytes_read].to_vec();
2923 let envelope = Stream::from_rid(chunk, rid.clone());
2924 debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2925 if let Err(e) = self.send_envelope(envelope).await {
2926 let inner = self.inner.lock().await;
2927 inner.queries.lock().await.remove(&rid);
2928 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2929 }
2930 }
2931
2932 let envelope = EndStream::from_rid(rid.clone());
2934 debug!("Sending endstream to #{}", rid);
2935 if let Err(e) = self.send_envelope(envelope).await {
2936 let inner = self.inner.lock().await;
2937 inner.queries.lock().await.remove(&rid);
2938 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2939 }
2940
2941 debug!("Wait for upload response for #{}", rid);
2943 let result = response_rx.await;
2944 let inner = self.inner.lock().await;
2945 inner.queries.lock().await.remove(&rid);
2946
2947 match result {
2948 Ok(response) => {
2949 if response.command == "error" {
2950 let error_response: ErrorResponse = prost::Message::decode(
2951 response.data.unwrap().value.as_ref(),
2952 )
2953 .map_err(|e| {
2954 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2955 })?;
2956 return Err(OpenIAPError::ServerError(error_response.message));
2957 }
2958 let upload_response: UploadResponse =
2959 prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2960 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2961 })?;
2962 Ok(upload_response)
2963 }
2964 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2965 }
2966 }
2967 #[tracing::instrument(skip_all)]
2969 pub async fn watch(
2970 &self,
2971 mut config: WatchRequest,
2972 env: EnvConfig,
2973 callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2974 ) -> Result<String, OpenIAPError> {
2975 if config.collectionname.is_empty() {
2976 config.collectionname = "entities".to_string();
2977 }
2978 if config.paths.is_empty() {
2979 config.paths = vec!["".to_string()];
2980 }
2981 let mut envelope = config.to_envelope();
2982 if !env.jwt.is_empty() {
2983 envelope.jwt = env.jwt;
2984 }
2985 if !env.spanid.is_empty() {
2986 envelope.spanid = env.spanid;
2987 }
2988 if !env.traceid.is_empty() {
2989 envelope.traceid = env.traceid;
2990 }
2991 let result = self.send(envelope, None).await;
2992 match result {
2993 Ok(m) => {
2994 let data = match m.data {
2995 Some(data) => data,
2996 None => {
2997 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2998 }
2999 };
3000 if m.command == "error" {
3001 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3002 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3003 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3004 }
3005 let response: WatchResponse = prost::Message::decode(data.value.as_ref())
3006 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3007
3008 let inner = self.inner.lock().await;
3009 inner
3010 .watches
3011 .lock()
3012 .await
3013 .insert(response.id.clone(), callback);
3014
3015 Ok(response.id)
3016 }
3017 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3018 }
3019 }
3020 #[tracing::instrument(skip_all)]
3022 pub async fn unwatch(&self, env: EnvConfig, id: &str) -> Result<(), OpenIAPError> {
3023 let config = UnWatchRequest::byid(id);
3024 let mut envelope = config.to_envelope();
3025 if !env.jwt.is_empty() {
3026 envelope.jwt = env.jwt;
3027 }
3028 if !env.spanid.is_empty() {
3029 envelope.spanid = env.spanid;
3030 }
3031 if !env.traceid.is_empty() {
3032 envelope.traceid = env.traceid;
3033 }
3034 let result = self.send(envelope, None).await;
3035 match result {
3036 Ok(m) => {
3037 let data = match m.data {
3038 Some(data) => data,
3039 None => {
3040 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3041 }
3042 };
3043 if m.command == "error" {
3044 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3045 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3046 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3047 }
3048 Ok(())
3049 }
3050 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3051 }
3052 }
3053 #[tracing::instrument(skip_all)]
3055 pub async fn register_queue(
3056 &self,
3057 mut config: RegisterQueueRequest,
3058 env: EnvConfig,
3059 callback: QueueCallbackFn,
3060 ) -> Result<String, OpenIAPError> {
3061 if config.queuename.is_empty() {
3062 config.queuename = "".to_string();
3063 }
3064 let mut envelope = config.to_envelope();
3065 if !env.jwt.is_empty() {
3066 envelope.jwt = env.jwt;
3067 }
3068 if !env.spanid.is_empty() {
3069 envelope.spanid = env.spanid;
3070 }
3071 if !env.traceid.is_empty() {
3072 envelope.traceid = env.traceid;
3073 }
3074 let result = self.send(envelope, None).await;
3075 match result {
3076 Ok(m) => {
3077 let data = match m.data {
3078 Some(data) => data,
3079 None => {
3080 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3081 }
3082 };
3083 if m.command == "error" {
3084 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3085 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3086 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3087 }
3088 let response: RegisterQueueResponse =
3089 prost::Message::decode(data.value.as_ref())
3090 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3091
3092 let inner = self.inner.lock().await;
3093 inner
3094 .queues
3095 .lock()
3096 .await
3097 .insert(response.queuename.clone(), callback);
3098
3099 Ok(response.queuename)
3100 }
3101 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3102 }
3103 }
3104 #[tracing::instrument(skip_all)]
3106 pub async fn unregister_queue(&self, env: EnvConfig, queuename: &str) -> Result<(), OpenIAPError> {
3107 let config = UnRegisterQueueRequest::byqueuename(queuename);
3108 let mut envelope = config.to_envelope();
3109 if !env.jwt.is_empty() {
3110 envelope.jwt = env.jwt;
3111 }
3112 if !env.spanid.is_empty() {
3113 envelope.spanid = env.spanid;
3114 }
3115 if !env.traceid.is_empty() {
3116 envelope.traceid = env.traceid;
3117 }
3118 let result = self.send(envelope, None).await;
3119 match result {
3120 Ok(m) => {
3121 let data = match m.data {
3122 Some(data) => data,
3123 None => {
3124 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3125 }
3126 };
3127 if m.command == "error" {
3128 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3129 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3130 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3131 }
3132 Ok(())
3133 }
3134 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3135 }
3136 }
3137 #[tracing::instrument(skip_all)]
3139 pub async fn register_exchange(
3140 &self,
3141 mut config: RegisterExchangeRequest,
3142 env: EnvConfig,
3143 callback: QueueCallbackFn,
3144 ) -> Result<String, OpenIAPError> {
3145 if config.exchangename.is_empty() {
3146 return Err(OpenIAPError::ClientError(
3147 "No exchange name provided".to_string(),
3148 ));
3149 }
3150 if config.algorithm.is_empty() {
3151 config.algorithm = "fanout".to_string();
3152 }
3153 let mut envelope = config.to_envelope();
3154 if !env.jwt.is_empty() {
3155 envelope.jwt = env.jwt;
3156 }
3157 if !env.spanid.is_empty() {
3158 envelope.spanid = env.spanid;
3159 }
3160 if !env.traceid.is_empty() {
3161 envelope.traceid = env.traceid;
3162 }
3163 let result = self.send(envelope, None).await;
3164 match result {
3165 Ok(m) => {
3166 let data = match m.data {
3167 Some(data) => data,
3168 None => {
3169 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3170 }
3171 };
3172 if m.command == "error" {
3173 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3174 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3175 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3176 }
3177 let response: RegisterExchangeResponse =
3178 prost::Message::decode(data.value.as_ref())
3179 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3180 if !response.queuename.is_empty() {
3181 let inner = self.inner.lock().await;
3182 inner
3183 .queues
3184 .lock()
3185 .await
3186 .insert(response.queuename.clone(), callback);
3187 }
3188 Ok(response.queuename)
3189 }
3190 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3191 }
3192 }
3193 #[tracing::instrument(skip_all)]
3195 pub async fn queue_message(
3196 &self,
3197 config: QueueMessageRequest,
3198 env: EnvConfig,
3199 ) -> Result<QueueMessageResponse, OpenIAPError> {
3200 if config.queuename.is_empty() && config.exchangename.is_empty() {
3201 return Err(OpenIAPError::ClientError(
3202 "No queue or exchange name provided".to_string(),
3203 ));
3204 }
3205 let mut envelope = config.to_envelope();
3206 if !env.jwt.is_empty() {
3207 envelope.jwt = env.jwt;
3208 }
3209 if !env.spanid.is_empty() {
3210 envelope.spanid = env.spanid;
3211 }
3212 if !env.traceid.is_empty() {
3213 envelope.traceid = env.traceid;
3214 }
3215 let result = self.send(envelope, None).await;
3216 match result {
3217 Ok(m) => {
3218 let data = match m.data {
3219 Some(d) => d,
3220 None => {
3221 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3222 }
3223 };
3224 if m.command == "error" {
3225 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3226 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3227 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3228 }
3229 let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
3230 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3231 Ok(response)
3232 }
3233 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3234 }
3235 }
3236 #[tracing::instrument(skip_all)]
3238 pub async fn rpc(&self, mut config: QueueMessageRequest, env: EnvConfig, timeout: tokio::time::Duration) -> Result<String, OpenIAPError> {
3239 if config.queuename.is_empty() && config.exchangename.is_empty() {
3240 return Err(OpenIAPError::ClientError(
3241 "No queue or exchange name provided".to_string(),
3242 ));
3243 }
3244
3245 let (tx, rx) = oneshot::channel::<String>();
3246 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3247
3248 let callback: QueueCallbackFn = Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
3250 if let Some(tx) = tx.lock().unwrap().take() {
3251 let _ = tx.send(event.data);
3252 } else {
3253 debug!("Queue already closed");
3254 }
3255 Box::pin(async { None })
3256 });
3257
3258 let mut reply_queue_guard = self.rpc_reply_queue.lock().await;
3260 let mut callback_guard = self.rpc_callback.lock().await;
3261
3262 let state = self.get_state();
3264 if reply_queue_guard.is_none() || !(state == ClientState::Connected || state == ClientState::Signedin) {
3265 let q = self
3267 .register_queue(
3268 RegisterQueueRequest {
3269 queuename: "".to_string(),
3270 },
3271 crate::EnvConfig::new(),
3272 callback.clone(),
3273 )
3274 .await?;
3275 *reply_queue_guard = Some(q.clone());
3276 *callback_guard = Some(callback.clone());
3277 } else {
3278 if let Some(qname) = reply_queue_guard.as_ref() {
3280 let inner = self.inner.lock().await;
3281 inner.queues.lock().await.insert(qname.clone(), callback.clone());
3282 }
3283 }
3284
3285 let q = reply_queue_guard.as_ref().unwrap().clone();
3286 config.replyto = q.clone();
3287 let mut envelope = config.to_envelope();
3288 if !env.jwt.is_empty() {
3289 envelope.jwt = env.jwt;
3290 }
3291 if !env.spanid.is_empty() {
3292 envelope.spanid = env.spanid;
3293 }
3294 if !env.traceid.is_empty() {
3295 envelope.traceid = env.traceid;
3296 }
3297
3298 let result = self.send(envelope, None).await;
3299 let rpc_result = match result {
3300 Ok(m) => {
3301 let data = match m.data {
3302 Some(d) => d,
3303 None => {
3304 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3305 }
3306 };
3307 if m.command == "error" {
3308 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3309 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3310 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3311 }
3312
3313 match tokio::time::timeout(timeout, rx).await {
3314 Ok(Ok(val)) => Ok(val),
3315 Ok(Err(e)) => Err(OpenIAPError::CustomError(e.to_string())),
3316 Err(_) => {
3317 *reply_queue_guard = None;
3319 *callback_guard = None;
3320 Err(OpenIAPError::ClientError("RPC request timed out".to_string()))
3321 },
3322 }
3323 }
3324 Err(e) => {
3325 *reply_queue_guard = None;
3327 *callback_guard = None;
3328 Err(OpenIAPError::ClientError(e.to_string()))
3329 }
3330 };
3331
3332 rpc_result
3333 }
3334 #[tracing::instrument(skip_all)]
3408 pub async fn push_workitem(
3409 &self,
3410 mut config: PushWorkitemRequest,
3411 env: EnvConfig,
3412 ) -> Result<PushWorkitemResponse, OpenIAPError> {
3413 if config.wiq.is_empty() && config.wiqid.is_empty() {
3414 return Err(OpenIAPError::ClientError(
3415 "No queue name or id provided".to_string(),
3416 ));
3417 }
3418 for f in &mut config.files {
3419 if f.filename.is_empty() && f.file.is_empty() {
3420 debug!("Filename is empty");
3421 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3422 if !std::path::Path::new(&f.filename).exists() {
3424 debug!("File does not exist: {}", f.filename);
3425 } else {
3426 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3427 if filesize < 5 * 1024 * 1024 {
3429 debug!("File {} exists so ATTACHING it.", f.filename);
3430 let filename = std::path::Path::new(&f.filename)
3431 .file_name()
3432 .unwrap()
3433 .to_str()
3434 .unwrap();
3435 f.file = std::fs::read(&f.filename).unwrap();
3436 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3439 f.compressed = true;
3440 f.filename = filename.to_string();
3441 f.id = "findme".to_string();
3442 trace!(
3443 "File {} was read and assigned to f.file, size: {}",
3444 f.filename,
3445 f.file.len()
3446 );
3447 } else {
3448 debug!("File {} exists so UPLOADING it.", f.filename);
3449 let filename = std::path::Path::new(&f.filename)
3450 .file_name()
3451 .unwrap()
3452 .to_str()
3453 .unwrap();
3454 let uploadconfig = UploadRequest {
3455 filename: filename.to_string(),
3456 collectionname: "fs.files".to_string(),
3457 ..Default::default()
3458 };
3459 let uploadresult = self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3460 trace!("File {} was upload as {}", filename, uploadresult.id);
3461 f.id = uploadresult.id.clone();
3463 f.filename = filename.to_string();
3464 }
3465 }
3466 } else {
3467 debug!("File {} is already uploaded", f.filename);
3468 }
3469 }
3470 let mut envelope = config.to_envelope();
3471 if !env.jwt.is_empty() {
3472 envelope.jwt = env.jwt;
3473 }
3474 if !env.spanid.is_empty() {
3475 envelope.spanid = env.spanid;
3476 }
3477 if !env.traceid.is_empty() {
3478 envelope.traceid = env.traceid;
3479 }
3480 let result = self.send(envelope, None).await;
3481 match result {
3482 Ok(m) => {
3483 let data = match m.data {
3484 Some(data) => data,
3485 None => {
3486 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3487 }
3488 };
3489 if m.command == "error" {
3490 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3491 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3492 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3493 }
3494 let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3495 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3496 Ok(response)
3497 }
3498 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3499 }
3500 }
3501 #[tracing::instrument(skip_all)]
3505 pub async fn push_workitems(
3506 &self,
3507 mut config: PushWorkitemsRequest,
3508 env: EnvConfig,
3509 ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3510 if config.wiq.is_empty() && config.wiqid.is_empty() {
3511 return Err(OpenIAPError::ClientError(
3512 "No queue name or id provided".to_string(),
3513 ));
3514 }
3515 for wi in &mut config.items {
3516 for f in &mut wi.files {
3517 if f.filename.is_empty() && f.file.is_empty() {
3518 debug!("Filename is empty");
3519 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3520 if !std::path::Path::new(&f.filename).exists() {
3522 debug!("File does not exist: {}", f.filename);
3523 } else {
3524 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3525 if filesize < 5 * 1024 * 1024 {
3527 debug!("File {} exists so ATTACHING it.", f.filename);
3528 let filename = std::path::Path::new(&f.filename)
3529 .file_name()
3530 .unwrap()
3531 .to_str()
3532 .unwrap();
3533 f.file = std::fs::read(&f.filename).unwrap();
3534 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3537 f.compressed = true;
3538 f.filename = filename.to_string();
3539 f.id = "findme".to_string();
3540 trace!(
3541 "File {} was read and assigned to f.file, size: {}",
3542 f.filename,
3543 f.file.len()
3544 );
3545 } else {
3546 debug!("File {} exists so UPLOADING it.", f.filename);
3547 let filename = std::path::Path::new(&f.filename)
3548 .file_name()
3549 .unwrap()
3550 .to_str()
3551 .unwrap();
3552 let uploadconfig = UploadRequest {
3553 filename: filename.to_string(),
3554 collectionname: "fs.files".to_string(),
3555 ..Default::default()
3556 };
3557 let uploadresult =
3558 self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3559 trace!("File {} was upload as {}", filename, uploadresult.id);
3560 f.id = uploadresult.id.clone();
3562 f.filename = filename.to_string();
3563 }
3564 }
3565 } else {
3566 debug!("File {} is already uploaded", f.filename);
3567 }
3568 }
3569 }
3570 let mut envelope = config.to_envelope();
3571 if !env.jwt.is_empty() {
3572 envelope.jwt = env.jwt;
3573 }
3574 if !env.spanid.is_empty() {
3575 envelope.spanid = env.spanid;
3576 }
3577 if !env.traceid.is_empty() {
3578 envelope.traceid = env.traceid;
3579 }
3580 let result = self.send(envelope, None).await;
3581 match result {
3582 Ok(m) => {
3583 let data = match m.data {
3584 Some(data) => data,
3585 None => {
3586 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3587 }
3588 };
3589 if m.command == "error" {
3590 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3591 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3592 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3593 }
3594 let response: PushWorkitemsResponse =
3595 prost::Message::decode(data.value.as_ref())
3596 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3597 Ok(response)
3598 }
3599 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3600 }
3601 }
3602 #[tracing::instrument(skip_all)]
3605 pub async fn pop_workitem(
3606 &self,
3607 config: PopWorkitemRequest,
3608 env: EnvConfig,
3609 downloadfolder: Option<&str>,
3610 ) -> Result<PopWorkitemResponse, OpenIAPError> {
3611 if config.wiq.is_empty() && config.wiqid.is_empty() {
3612 return Err(OpenIAPError::ClientError(
3613 "No queue name or id provided".to_string(),
3614 ));
3615 }
3616 let mut envelope = config.to_envelope();
3617 if !env.jwt.is_empty() {
3618 envelope.jwt = env.jwt;
3619 }
3620 if !env.spanid.is_empty() {
3621 envelope.spanid = env.spanid;
3622 }
3623 if !env.traceid.is_empty() {
3624 envelope.traceid = env.traceid;
3625 }
3626 let result = self.send(envelope, None).await;
3627 match result {
3628 Ok(m) => {
3629 let data = match m.data {
3630 Some(data) => data,
3631 None => {
3632 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3633 }
3634 };
3635 if m.command == "error" {
3636 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3637 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3638 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3639 }
3640 let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3641 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3642
3643 match &response.workitem {
3644 Some(wi) => {
3645 for f in &wi.files {
3646 if !f.id.is_empty() {
3647 let downloadconfig = DownloadRequest {
3648 id: f.id.clone(),
3649 collectionname: "fs.files".to_string(),
3650 ..Default::default()
3651 };
3652 let downloadresult =
3653 match self.download(downloadconfig,
3654 crate::EnvConfig::new(),
3655 downloadfolder, None).await
3656 {
3657 Ok(r) => r,
3658 Err(e) => {
3659 debug!("Failed to download file: {}", e);
3660 continue;
3661 }
3662 };
3663 debug!(
3664 "File {} was downloaded as {}",
3665 f.filename, downloadresult.filename
3666 );
3667 }
3668 }
3669 }
3670 None => {
3671 debug!("No workitem found");
3672 }
3673 }
3674 Ok(response)
3675 }
3676 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3677 }
3678 }
3679 #[tracing::instrument(skip_all)]
3685 pub async fn update_workitem(
3686 &self,
3687 mut config: UpdateWorkitemRequest,
3688 env: EnvConfig,
3689 ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3690 match &config.workitem {
3691 Some(wiq) => {
3692 if wiq.id.is_empty() {
3693 return Err(OpenIAPError::ClientError(
3694 "No workitem id provided".to_string(),
3695 ));
3696 }
3697 }
3698 None => {
3699 return Err(OpenIAPError::ClientError(
3700 "No workitem provided".to_string(),
3701 ));
3702 }
3703 }
3704 for f in &mut config.files {
3705 if f.filename.is_empty() && f.file.is_empty() {
3706 debug!("Filename is empty");
3707 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3708 if !std::path::Path::new(&f.filename).exists() {
3709 debug!("File does not exist: {}", f.filename);
3710 } else {
3711 debug!("File {} exists so uploading it.", f.filename);
3712 let filename = std::path::Path::new(&f.filename)
3713 .file_name()
3714 .unwrap()
3715 .to_str()
3716 .unwrap();
3717 let uploadconfig = UploadRequest {
3718 filename: filename.to_string(),
3719 collectionname: "fs.files".to_string(),
3720 ..Default::default()
3721 };
3722 let uploadresult = self.upload(uploadconfig, crate::EnvConfig::new(), &f.filename).await.unwrap();
3723 trace!("File {} was upload as {}", filename, uploadresult.id);
3724 f.id = uploadresult.id.clone();
3725 f.filename = filename.to_string();
3726 }
3727 } else {
3728 debug!("Skipped file");
3729 }
3730 }
3731 let mut envelope = config.to_envelope();
3732 if !env.jwt.is_empty() {
3733 envelope.jwt = env.jwt;
3734 }
3735 if !env.spanid.is_empty() {
3736 envelope.spanid = env.spanid;
3737 }
3738 if !env.traceid.is_empty() {
3739 envelope.traceid = env.traceid;
3740 }
3741 let result = self.send(envelope, None).await;
3742 match result {
3743 Ok(m) => {
3744 let data = match m.data {
3745 Some(d) => d,
3746 None => {
3747 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3748 }
3749 };
3750 if m.command == "error" {
3751 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3752 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3753 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3754 }
3755 let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3756 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3757 Ok(response)
3758 }
3759 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3760 }
3761 }
3762 #[tracing::instrument(skip_all)]
3764 pub async fn delete_workitem(
3765 &self,
3766 config: DeleteWorkitemRequest,
3767 env: EnvConfig,
3768 ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3769 if config.id.is_empty() {
3770 return Err(OpenIAPError::ClientError(
3771 "No workitem id provided".to_string(),
3772 ));
3773 }
3774 let mut envelope = config.to_envelope();
3775 if !env.jwt.is_empty() {
3776 envelope.jwt = env.jwt;
3777 }
3778 if !env.spanid.is_empty() {
3779 envelope.spanid = env.spanid;
3780 }
3781 if !env.traceid.is_empty() {
3782 envelope.traceid = env.traceid;
3783 }
3784 let result = self.send(envelope, None).await;
3785 match result {
3786 Ok(m) => {
3787 let data = match m.data {
3788 Some(d) => d,
3789 None => {
3790 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3791 }
3792 };
3793 if m.command == "error" {
3794 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3795 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3796 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3797 }
3798 let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3799 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3800 Ok(response)
3801 }
3802 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3803 }
3804 }
3805 #[tracing::instrument(skip_all)]
3807 pub async fn add_workitem_queue(
3808 &self,
3809 config: AddWorkItemQueueRequest,
3810 env: EnvConfig,
3811 ) -> Result<WorkItemQueue, OpenIAPError> {
3812 if config.workitemqueue.is_none() {
3813 return Err(OpenIAPError::ClientError(
3814 "No workitem queue name provided".to_string(),
3815 ));
3816 }
3817 let mut envelope = config.to_envelope();
3818 if !env.jwt.is_empty() {
3819 envelope.jwt = env.jwt;
3820 }
3821 if !env.spanid.is_empty() {
3822 envelope.spanid = env.spanid;
3823 }
3824 if !env.traceid.is_empty() {
3825 envelope.traceid = env.traceid;
3826 }
3827 let result = self.send(envelope, None).await;
3828 match result {
3829 Ok(m) => {
3830 let data = match m.data {
3831 Some(d) => d,
3832 None => {
3833 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3834 }
3835 };
3836 if m.command == "error" {
3837 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3838 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3839 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3840 }
3841 let response: AddWorkItemQueueResponse =
3842 prost::Message::decode(data.value.as_ref())
3843 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3844 match response.workitemqueue {
3845 Some(wiq) => Ok(wiq),
3846 None => {
3847 return Err(OpenIAPError::ClientError(
3848 "No workitem queue returned".to_string(),
3849 ));
3850 }
3851 }
3852 }
3853 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3854 }
3855 }
3856 #[tracing::instrument(skip_all)]
3858 pub async fn update_workitem_queue(
3859 &self,
3860 config: UpdateWorkItemQueueRequest,
3861 env: EnvConfig,
3862 ) -> Result<WorkItemQueue, OpenIAPError> {
3863 if config.workitemqueue.is_none() {
3864 return Err(OpenIAPError::ClientError(
3865 "No workitem queue name provided".to_string(),
3866 ));
3867 }
3868 let mut envelope = config.to_envelope();
3869 if !env.jwt.is_empty() {
3870 envelope.jwt = env.jwt;
3871 }
3872 if !env.spanid.is_empty() {
3873 envelope.spanid = env.spanid;
3874 }
3875 if !env.traceid.is_empty() {
3876 envelope.traceid = env.traceid;
3877 }
3878 let result = self.send(envelope, None).await;
3879 match result {
3880 Ok(m) => {
3881 let data = match m.data {
3882 Some(d) => d,
3883 None => {
3884 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3885 }
3886 };
3887 if m.command == "error" {
3888 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3889 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3890 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3891 }
3892 let response: UpdateWorkItemQueueResponse =
3893 prost::Message::decode(data.value.as_ref())
3894 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3895 match response.workitemqueue {
3896 Some(wiq) => Ok(wiq),
3897 None => {
3898 return Err(OpenIAPError::ClientError(
3899 "No workitem queue returned".to_string(),
3900 ));
3901 }
3902 }
3903 }
3904 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3905 }
3906 }
3907 #[tracing::instrument(skip_all)]
3909 pub async fn delete_workitem_queue(
3910 &self,
3911 config: DeleteWorkItemQueueRequest,
3912 env: EnvConfig,
3913 ) -> Result<(), OpenIAPError> {
3914 if config.wiq.is_empty() && config.wiqid.is_empty() {
3915 return Err(OpenIAPError::ClientError(
3916 "No workitem queue name or id provided".to_string(),
3917 ));
3918 }
3919 let mut envelope = config.to_envelope();
3920 if !env.jwt.is_empty() {
3921 envelope.jwt = env.jwt;
3922 }
3923 if !env.spanid.is_empty() {
3924 envelope.spanid = env.spanid;
3925 }
3926 if !env.traceid.is_empty() {
3927 envelope.traceid = env.traceid;
3928 }
3929 let result = self.send(envelope, None).await;
3930 match result {
3931 Ok(m) => {
3932 let data = match m.data {
3933 Some(d) => d,
3934 None => {
3935 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3936 }
3937 };
3938 if m.command == "error" {
3939 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3940 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3941 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3942 }
3943 Ok(())
3944 }
3945 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3946 }
3947 }
3948 #[tracing::instrument(skip_all)]
3950 pub async fn custom_command(
3951 &self,
3952 config: CustomCommandRequest,
3953 env: EnvConfig,
3954 timeout: Option<tokio::time::Duration>,
3955 ) -> Result<String, OpenIAPError> {
3956 if config.command.is_empty() {
3957 return Err(OpenIAPError::ClientError("No command provided".to_string()));
3958 }
3959 let mut envelope = config.to_envelope();
3960 if !env.jwt.is_empty() {
3961 envelope.jwt = env.jwt;
3962 }
3963 if !env.spanid.is_empty() {
3964 envelope.spanid = env.spanid;
3965 }
3966 if !env.traceid.is_empty() {
3967 envelope.traceid = env.traceid;
3968 }
3969 let result = self.send(envelope, timeout).await;
3970 match result {
3971 Ok(m) => {
3972 let data = match m.data {
3973 Some(d) => d,
3974 None => {
3975 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3976 }
3977 };
3978 if m.command == "error" {
3979 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3980 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3981 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3982 }
3983 let response: CustomCommandResponse =
3984 prost::Message::decode(data.value.as_ref())
3985 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3986 Ok(response.result)
3987 }
3988 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3989 }
3990 }
3991 #[tracing::instrument(skip_all)]
3993 pub async fn delete_package(&self, env: EnvConfig, packageid: &str) -> Result<(), OpenIAPError> {
3994 let config = DeletePackageRequest::byid(packageid);
3995 let mut envelope = config.to_envelope();
3996 if !env.jwt.is_empty() {
3997 envelope.jwt = env.jwt;
3998 }
3999 if !env.spanid.is_empty() {
4000 envelope.spanid = env.spanid;
4001 }
4002 if !env.traceid.is_empty() {
4003 envelope.traceid = env.traceid;
4004 }
4005 let result = self.send(envelope, None).await;
4006 match result {
4007 Ok(m) => {
4008 let data = match m.data {
4009 Some(data) => data,
4010 None => {
4011 return Err(OpenIAPError::ClientError("No data returned".to_string()));
4012 }
4013 };
4014 if m.command == "error" {
4015 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4016 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4017 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4018 }
4019 Ok(())
4022 }
4023 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4024 }
4025 }
4026 #[tracing::instrument(skip_all)]
4028 pub async fn start_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4029 let config = StartAgentRequest::byid(agentid);
4030 let mut envelope = config.to_envelope();
4031 if !env.jwt.is_empty() {
4032 envelope.jwt = env.jwt;
4033 }
4034 if !env.spanid.is_empty() {
4035 envelope.spanid = env.spanid;
4036 }
4037 if !env.traceid.is_empty() {
4038 envelope.traceid = env.traceid;
4039 }
4040 let result = self.send(envelope, None).await;
4041 match result {
4042 Ok(m) => {
4043 let data = match m.data {
4044 Some(d) => d,
4045 None => {
4046 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4047 }
4048 };
4049 if m.command == "error" {
4050 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4051 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4052 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4053 }
4054 Ok(())
4057 }
4058 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4059 }
4060 }
4061 #[tracing::instrument(skip_all)]
4063 pub async fn stop_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4064 let config = StopAgentRequest::byid(agentid);
4065 let mut envelope = config.to_envelope();
4066 if !env.jwt.is_empty() {
4067 envelope.jwt = env.jwt;
4068 }
4069 if !env.spanid.is_empty() {
4070 envelope.spanid = env.spanid;
4071 }
4072 if !env.traceid.is_empty() {
4073 envelope.traceid = env.traceid;
4074 }
4075 let result = self.send(envelope, None).await;
4076 match result {
4077 Ok(m) => {
4078 let data = match m.data {
4079 Some(d) => d,
4080 None => {
4081 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4082 }
4083 };
4084 if m.command == "error" {
4085 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4086 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4087 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4088 }
4089 Ok(())
4092 }
4093 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4094 }
4095 }
4096 #[tracing::instrument(skip_all)]
4098 pub async fn delete_agent_pod(&self, env: EnvConfig, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
4099 let config = DeleteAgentPodRequest::byid(agentid, podname);
4100 let mut envelope = config.to_envelope();
4101 if !env.jwt.is_empty() {
4102 envelope.jwt = env.jwt;
4103 }
4104 if !env.spanid.is_empty() {
4105 envelope.spanid = env.spanid;
4106 }
4107 if !env.traceid.is_empty() {
4108 envelope.traceid = env.traceid;
4109 }
4110 let result = self.send(envelope, None).await;
4111 match result {
4112 Ok(m) => {
4113 let data = match m.data {
4114 Some(d) => d,
4115 None => {
4116 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4117 }
4118 };
4119 if m.command == "error" {
4120 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4121 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4122 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4123 }
4124 Ok(())
4127 }
4128 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4129 }
4130 }
4131 #[tracing::instrument(skip_all)]
4133 pub async fn delete_agent(&self, env: EnvConfig, agentid: &str) -> Result<(), OpenIAPError> {
4134 let config = DeleteAgentRequest::byid(agentid);
4135 let mut envelope = config.to_envelope();
4136 if !env.jwt.is_empty() {
4137 envelope.jwt = env.jwt;
4138 }
4139 if !env.spanid.is_empty() {
4140 envelope.spanid = env.spanid;
4141 }
4142 if !env.traceid.is_empty() {
4143 envelope.traceid = env.traceid;
4144 }
4145 let result = self.send(envelope, None).await;
4146 match result {
4147 Ok(m) => {
4148 let data = match m.data {
4149 Some(d) => d,
4150 None => {
4151 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4152 }
4153 };
4154 if m.command == "error" {
4155 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4156 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4157 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4158 }
4159 Ok(())
4162 }
4163 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4164 }
4165 }
4166 #[tracing::instrument(skip_all)]
4168 pub async fn get_agent_pods(&self, env: EnvConfig, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
4169 let config = GetAgentPodsRequest::byid(agentid, stats);
4170 let mut envelope = config.to_envelope();
4171 if !env.jwt.is_empty() {
4172 envelope.jwt = env.jwt;
4173 }
4174 if !env.spanid.is_empty() {
4175 envelope.spanid = env.spanid;
4176 }
4177 if !env.traceid.is_empty() {
4178 envelope.traceid = env.traceid;
4179 }
4180 let result = self.send(envelope, None).await;
4181 match result {
4182 Ok(m) => {
4183 let data = match m.data {
4184 Some(d) => d,
4185 None => {
4186 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4187 }
4188 };
4189 if m.command == "error" {
4190 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4191 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4192 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4193 }
4194 let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
4195 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4196 Ok(response.results)
4197 }
4198 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4199 }
4200 }
4201 #[tracing::instrument(skip_all)]
4203 pub async fn get_agent_pod_logs(
4204 &self,
4205 env: EnvConfig,
4206 agentid: &str,
4207 podname: &str,
4208 ) -> Result<String, OpenIAPError> {
4209 let config = GetAgentLogRequest::new(agentid, podname);
4210 let mut envelope = config.to_envelope();
4211 if !env.jwt.is_empty() {
4212 envelope.jwt = env.jwt;
4213 }
4214 if !env.spanid.is_empty() {
4215 envelope.spanid = env.spanid;
4216 }
4217 if !env.traceid.is_empty() {
4218 envelope.traceid = env.traceid;
4219 }
4220 let result = self.send(envelope, None).await;
4221 match result {
4222 Ok(m) => {
4223 let data = match m.data {
4224 Some(d) => d,
4225 None => {
4226 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4227 }
4228 };
4229 if m.command == "error" {
4230 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4231 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4232 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4233 }
4234 let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
4235 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4236 Ok(response.result)
4237 }
4238 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4239 }
4240 }
4241
4242 #[tracing::instrument(skip_all)]
4245 pub async fn ensure_customer(
4246 &self,
4247 config: EnsureCustomerRequest,
4248 env: EnvConfig,
4249 ) -> Result<EnsureCustomerResponse, OpenIAPError> {
4250 if config.customer.is_none() && config.stripe.is_none() {
4251 return Err(OpenIAPError::ClientError(
4252 "No customer or stripe provided".to_string(),
4253 ));
4254 }
4255 let mut envelope = config.to_envelope();
4256 if !env.jwt.is_empty() {
4257 envelope.jwt = env.jwt;
4258 }
4259 if !env.spanid.is_empty() {
4260 envelope.spanid = env.spanid;
4261 }
4262 if !env.traceid.is_empty() {
4263 envelope.traceid = env.traceid;
4264 }
4265 let result = self.send(envelope, None).await;
4266 match result {
4267 Ok(m) => {
4268 let data = match m.data {
4269 Some(d) => d,
4270 None => {
4271 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4272 }
4273 };
4274 if m.command == "error" {
4275 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4276 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4277 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4278 }
4279 let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
4280 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4281 Ok(response)
4282 }
4283 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4284 }
4285 }
4286 #[tracing::instrument(skip_all)]
4288 pub async fn create_workflow_instance(
4289 &self,
4290 config: CreateWorkflowInstanceRequest,
4291 env: EnvConfig,
4292 ) -> Result<String, OpenIAPError> {
4293 if config.workflowid.is_empty() {
4294 return Err(OpenIAPError::ClientError(
4295 "No workflow id provided".to_string(),
4296 ));
4297 }
4298 let mut envelope = config.to_envelope();
4299 if !env.jwt.is_empty() {
4300 envelope.jwt = env.jwt;
4301 }
4302 if !env.spanid.is_empty() {
4303 envelope.spanid = env.spanid;
4304 }
4305 if !env.traceid.is_empty() {
4306 envelope.traceid = env.traceid;
4307 }
4308 let result = self.send(envelope, None).await;
4309 match result {
4310 Ok(m) => {
4311 let data = match m.data {
4312 Some(d) => d,
4313 None => {
4314 return Err(OpenIAPError::ClientError("No data in response".to_string()));
4315 }
4316 };
4317 if m.command == "error" {
4318 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4319 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4320 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4321 }
4322 let response: CreateWorkflowInstanceResponse =
4323 prost::Message::decode(data.value.as_ref())
4324 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4325 Ok(response.instanceid)
4326 }
4327 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4328 }
4329 }
4330
4331 #[tracing::instrument(skip_all)]
4333 pub async fn invoke_openrpa(
4334 &self,
4335 config: InvokeOpenRpaRequest,
4336 env: EnvConfig,
4337 timeout: Option<tokio::time::Duration>,
4338 ) -> Result<String, OpenIAPError> {
4339 if config.robotid.is_empty() {
4340 return Err(OpenIAPError::ClientError(
4341 "No robot id provided".to_string(),
4342 ));
4343 }
4344 if config.workflowid.is_empty() {
4345 return Err(OpenIAPError::ClientError(
4346 "No workflow id provided".to_string(),
4347 ));
4348 }
4349
4350 let (tx, rx) = oneshot::channel::<String>();
4351 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
4352
4353 let q = self
4354 .register_queue(
4355 RegisterQueueRequest {
4356 queuename: "".to_string(),
4357 },
4358 crate::EnvConfig::new(),
4359 Arc::new(move |_client: Arc<Client>, event: QueueEvent| {
4360 let tx = tx.clone();
4361 Box::pin(async move {
4362 let json = event.data.clone();
4363 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
4364 let command: String = obj["command"].as_str().unwrap().to_string();
4365 debug!("Received event: {:?}", event);
4366 if command.eq("invokesuccess") {
4367 debug!("Robot successfully started running workflow");
4368 } else if command.eq("invokeidle") {
4369 debug!("Workflow went idle");
4370 } else if command.eq("invokeerror") {
4371 debug!("Robot failed to run workflow");
4372 let tx = tx.lock().unwrap().take().unwrap();
4373 tx.send(event.data).unwrap();
4374 } else if command.eq("timeout") {
4375 debug!("No robot picked up the workflow");
4376 let tx = tx.lock().unwrap().take().unwrap();
4377 tx.send(event.data).unwrap();
4378 } else if command.eq("invokecompleted") {
4379 debug!("Robot completed running workflow");
4380 let tx = tx.lock().unwrap().take().unwrap();
4381 tx.send(event.data).unwrap();
4382 } else {
4383 let tx = tx.lock().unwrap().take().unwrap();
4384 tx.send(event.data).unwrap();
4385 }
4386 None
4387 })
4388 }),
4389 )
4390 .await
4391 .unwrap();
4392 debug!("Registered Response Queue: {:?}", q);
4393 let data = format!(
4394 "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"data\": {}}}",
4395 config.workflowid, config.payload
4396 );
4397 debug!("Send Data: {}", data);
4398 debug!("To Queue: {} With reply to: {}", config.robotid, q);
4399 let config = QueueMessageRequest {
4400 queuename: config.robotid.clone(),
4401 replyto: q.clone(),
4402 data,
4403 ..Default::default()
4404 };
4405 let mut envelope = config.to_envelope();
4406 if !env.jwt.is_empty() {
4407 envelope.jwt = env.jwt;
4408 }
4409 if !env.spanid.is_empty() {
4410 envelope.spanid = env.spanid;
4411 }
4412 if !env.traceid.is_empty() {
4413 envelope.traceid = env.traceid;
4414 }
4415 let result = self.send(envelope, timeout).await;
4416 match result {
4417 Ok(m) => {
4418 let data = match m.data {
4419 Some(d) => d,
4420 None => {
4421 return Err(OpenIAPError::ClientError("No data in response".to_string()))
4422 }
4423 };
4424 if m.command == "error" {
4425 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
4426 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
4427 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
4428 }
4429 let duration = timeout.unwrap_or_else(|| self.get_default_timeout());
4433 let json = match tokio::time::timeout(duration, rx).await {
4435 Ok(Ok(val)) => {
4436 let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4438 val
4439 },
4440 Ok(Err(e)) => {
4441 let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4442 return Err(OpenIAPError::CustomError(e.to_string()));
4443 },
4444 Err(_) => {
4445 let _ = self.unregister_queue(crate::EnvConfig::new(), &q).await;
4446 return Err(OpenIAPError::ServerError("Timeout".to_string()));
4447 },
4448 };
4449 debug!("Received json result: {:?}", json);
4450 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
4451 let command: String = obj["command"].as_str().unwrap().to_string();
4452 let mut data = "".to_string();
4453 if obj["data"].as_str().is_some() {
4454 data = obj["data"].as_str().unwrap().to_string();
4455 } else if obj["data"].as_object().is_some() {
4456 data = obj["data"].to_string();
4457 }
4458 if !command.eq("invokecompleted") {
4459 if command.eq("timeout") {
4460 return Err(OpenIAPError::ServerError("Timeout".to_string()));
4461 } else {
4462 if data.is_empty() {
4463 return Err(OpenIAPError::ServerError(
4464 "Error with no message".to_string(),
4465 ));
4466 }
4467 return Err(OpenIAPError::ServerError(data));
4468 }
4469 }
4470 Ok(data)
4480 }
4481 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
4482 }
4483 }
4484}
4485