1#![warn(missing_docs)]
2pub use openiap_proto::errors::*;
25pub use openiap_proto::protos::*;
26pub use openiap_proto::*;
27pub use prost_types::Timestamp;
28pub use protos::flow_service_client::FlowServiceClient;
29use sqids::Sqids;
30
31use tokio::task::JoinHandle;
32use tokio_tungstenite::{WebSocketStream};
33use tracing::{debug, error, info, trace};
34type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;
35type Result<T, E = StdError> = ::std::result::Result<T, E>;
36use std::fs::File;
37use std::io::{Read, Write};
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::Arc;
40use tokio::sync::Mutex;
41use tonic::transport::Channel;
42
43use tokio::sync::{mpsc, oneshot};
44
45use std::env;
46use std::time::Duration;
47
48#[cfg(feature = "otel")]
49mod otel;
50mod tests;
51mod ws;
52mod grpc;
53mod util;
54pub use crate::util::{enable_tracing, disable_tracing};
55
56type QuerySender = oneshot::Sender<Envelope>;
57type StreamSender = mpsc::Sender<Vec<u8>>;
58type Sock = WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
59use futures::{StreamExt };
60use async_channel::{unbounded};
61const VERSION: &str = "0.0.20";
62
63
64#[derive(Clone)]
67pub struct Client {
68 connect_called: Arc<std::sync::Mutex<bool>>,
70 runtime: Arc<std::sync::Mutex<Option<tokio::runtime::Runtime>>>,
72
73 stats: Arc<std::sync::Mutex<ClientStatistics>>,
75
76 task_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
77 pub client: Arc<std::sync::Mutex<ClientEnum>>,
79 user: Arc<std::sync::Mutex<Option<User>>>,
81 pub inner: Arc<Mutex<ClientInner>>,
83 pub config: Arc<std::sync::Mutex<Option<Config>>>,
85 pub auto_reconnect: Arc<std::sync::Mutex<bool>>,
87 pub url: Arc<std::sync::Mutex<String>>,
89 pub username: Arc<std::sync::Mutex<String>>,
91 pub password: Arc<std::sync::Mutex<String>>,
93 pub jwt: Arc<std::sync::Mutex<String>>,
95 agent_name: Arc<std::sync::Mutex<String>>,
96 agent_version: Arc<std::sync::Mutex<String>>,
97 event_sender: async_channel::Sender<ClientEvent>,
98 event_receiver: async_channel::Receiver<ClientEvent>,
99 out_envelope_sender: async_channel::Sender<Envelope>,
100 out_envelope_receiver: async_channel::Receiver<Envelope>,
101 pub state: Arc<std::sync::Mutex<ClientState>>,
103 pub msgcount: Arc<std::sync::Mutex<i32>>,
105 pub reconnect_ms: Arc<std::sync::Mutex<i32>>,
107
108}
109#[derive(Clone, Default)]
111pub struct ClientStatistics {
112 connection_attempts: u64,
113 connections: u64,
114 package_tx: u64,
115 package_rx: u64,
116 signin: u64,
117 download: u64,
118 getdocumentversion: u64,
119 customcommand: u64,
120 listcollections: u64,
121 createcollection: u64,
122 dropcollection: u64,
123 ensurecustomer: u64,
124 invokeopenrpa: u64,
125 registerqueue: u64,
126 registerexchange: u64,
127 unregisterqueue: u64,
128 watch: u64,
129 unwatch: u64,
130 queuemessage: u64,
131 pushworkitem: u64,
132 pushworkitems: u64,
133 popworkitem: u64,
134 updateworkitem: u64,
135 deleteworkitem: u64,
136 addworkitemqueue: u64,
137 updateworkitemqueue: u64,
138 deleteworkitemqueue: u64,
139 getindexes: u64,
140 createindex: u64,
141 dropindex: u64,
142 upload: u64,
143 query: u64,
144 count: u64,
145 distinct: u64,
146 aggregate: u64,
147 insertone: u64,
148 insertmany: u64,
149 insertorupdateone: u64,
150 insertorupdatemany: u64,
151 updateone: u64,
152 updatedocument: u64,
153 deleteone: u64,
154 deletemany: u64,
155}
156#[derive(Clone)]
158pub struct ClientInner {
159 pub queries: Arc<Mutex<std::collections::HashMap<String, QuerySender>>>,
161 pub streams: Arc<Mutex<std::collections::HashMap<String, StreamSender>>>,
163 #[allow(clippy::type_complexity)]
165 pub watches:
166 Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(WatchEvent) + Send + Sync>>>>,
167 #[allow(clippy::type_complexity)]
169 pub queues:
170 Arc<Mutex<std::collections::HashMap<String, Box<dyn Fn(QueueEvent) + Send + Sync>>>>,
171}
172#[derive(Clone, Debug)]
174pub enum ClientEnum {
175 None,
177 Grpc(FlowServiceClient<tonic::transport::Channel>),
179 WS(Arc<Mutex<Sock>>)
181}
182#[derive(Debug, Clone, PartialEq)]
184pub enum ClientEvent {
185 Connecting,
187 Connected,
189 Disconnected(String),
191 SignedIn,
193 }
206#[derive(Debug, Clone, PartialEq)]
208pub enum ClientState {
209 Disconnected,
211 Connecting,
213 Connected,
215 Signedin
217}
218#[derive(Debug, Clone, serde::Deserialize)]
220#[allow(dead_code)]
221pub struct Config {
222 #[serde(default)]
223 wshost: String,
224 #[serde(default)]
225 wsurl: String,
226 #[serde(default)]
227 domain: String,
228 #[serde(default)]
229 auto_create_users: bool,
230 #[serde(default)]
231 namespace: String,
232 #[serde(default)]
233 agent_domain_schema: String,
234 #[serde(default)]
235 version: String,
236 #[serde(default)]
237 validate_emails: bool,
238 #[serde(default)]
239 forgot_pass_emails: bool,
240 #[serde(default)]
241 supports_watch: bool,
242 #[serde(default)]
243 amqp_enabled_exchange: bool,
244 #[serde(default)]
245 multi_tenant: bool,
246 #[serde(default)]
247 enable_entity_restriction: bool,
248 #[serde(default)]
249 enable_web_tours: bool,
250 #[serde(default)]
251 collections_with_text_index: Vec<String>,
252 #[serde(default)]
253 timeseries_collections: Vec<String>,
254 #[serde(default)]
255 ping_clients_interval: i32,
256 #[serde(default)]
257 validlicense: bool,
258 #[serde(default)]
259 forceddomains: Vec<String>,
260 #[serde(default)]
261 grafana_url: String,
262 #[serde(default)]
263 otel_metric_url: String,
264 #[serde(default)]
265 enable_analytics: bool,
266}
267impl std::fmt::Debug for ClientInner {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 f.debug_struct("ClientInner")
270 .field("queries", &self.queries)
272 .field("streams", &self.streams)
273 .finish()
274 }
275}
276impl Default for Client {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282impl Client {
283 pub fn new() -> Self {
285 let (ces, cer) = unbounded::<ClientEvent>();
286 let (out_es, out_er) = unbounded::<Envelope>();
287 Self {
288 task_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
289 stats: Arc::new(std::sync::Mutex::new(ClientStatistics::default())),
290 user: Arc::new(std::sync::Mutex::new(None)),
291 client: Arc::new(std::sync::Mutex::new(ClientEnum::None)),
292 connect_called: Arc::new(std::sync::Mutex::new(false)),
293 runtime: Arc::new(std::sync::Mutex::new(None)),
294 msgcount: Arc::new(std::sync::Mutex::new(-1)),
295 reconnect_ms: Arc::new(std::sync::Mutex::new(1000)),
296 inner: Arc::new(Mutex::new(ClientInner {
297 queries: Arc::new(Mutex::new(std::collections::HashMap::new())),
298 streams: Arc::new(Mutex::new(std::collections::HashMap::new())),
299 watches: Arc::new(Mutex::new(std::collections::HashMap::new())),
300 queues: Arc::new(Mutex::new(std::collections::HashMap::new())),
301 })),
302 config: Arc::new(std::sync::Mutex::new(None)),
303 auto_reconnect: Arc::new(std::sync::Mutex::new(true)),
304 url: Arc::new(std::sync::Mutex::new("".to_string())),
305 username: Arc::new(std::sync::Mutex::new("".to_string())),
306 password: Arc::new(std::sync::Mutex::new("".to_string())),
307 jwt: Arc::new(std::sync::Mutex::new("".to_string())),
308 agent_name: Arc::new(std::sync::Mutex::new("rust".to_string())),
309 agent_version: Arc::new(std::sync::Mutex::new(VERSION.to_string())),
310 event_sender: ces,
311 event_receiver: cer,
312 out_envelope_sender: out_es,
313 out_envelope_receiver: out_er,
314 state: Arc::new(std::sync::Mutex::new(ClientState::Disconnected)),
315 }
316 }
317 #[tracing::instrument(skip_all)]
319 pub fn connect(&self, dst: &str) -> Result<(), OpenIAPError> {
320 let rt = match tokio::runtime::Runtime::new() {
321 Ok(rt) => rt,
322 Err(e) => {
323 return Err(OpenIAPError::ClientError(format!(
324 "Failed to create tokio runtime: {}",
325 e
326 )));
327 }
328 };
329 self.set_runtime(Some(rt));
330 tokio::task::block_in_place(|| {
331 let handle = self.get_runtime_handle();
332 handle.block_on(self.connect_async(dst))
333 })
334 }
335
336 #[allow(unused_variables)]
338 pub async fn load_config(&self, strurl: &str, url: &url::Url) -> Option<Config> {
339 let config: Option<Config>;
340 let issecure = url.scheme() == "https" || url.scheme() == "wss" || url.port() == Some(443);
341 let mut port = url.port().unwrap_or(80);
342 if port == 50051 {
343 port = 3000;
344 }
345 let configurl = if issecure {
346 format!(
347 "{}://{}:{}/config",
348 "https",
349 url.host_str()
350 .unwrap_or("localhost.openiap.io")
351 .replace("grpc.", ""),
352 port
353 )
354 } else {
355 format!(
356 "{}://{}:{}/config",
357 "http",
358 url.host_str()
359 .unwrap_or("localhost.openiap.io")
360 .replace("grpc.", ""),
361 port
362 )
363 };
364
365 let configurl = url::Url::parse(configurl.as_str())
366 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e))).expect("wefew");
367 let o = minreq::get(configurl).send();
368 match o {
369 Ok(_) => {
370 let response = match o {
371 Ok(response) => response,
372 Err(e) => {
373 error!("Failed to get config: {}", e);
374 return None;
375 }
376 };
377 if response.status_code == 200 {
378 let body = response.as_str().unwrap();
379 config = Some(match serde_json::from_str(body) {
380 Ok(config) => config,
381 Err(e) => {
382 error!("Failed to parse config: {}", e);
383 return None;
384 }
385 });
386 } else {
387 config = None;
388 }
389 }
390 Err(e) => {
391 error!("Failed to get config: {}", e);
392 return None;
393 }
394 }
395 let mut _enable_analytics = true;
396 let mut _otel_metric_url = std::env::var("OTEL_METRIC_URL").unwrap_or_default();
397 let mut apihostname = url.host_str().unwrap_or("localhost.openiap.io").to_string();
398 if apihostname.starts_with("grpc.") {
399 apihostname = apihostname[5..].to_string();
400 }
401
402 if config.is_some() {
403 let config = config.as_ref().unwrap();
404 if !config.otel_metric_url.is_empty() {
405 _otel_metric_url = config.otel_metric_url.clone();
406 }
407 if !config.domain.is_empty() {
408 apihostname = config.domain.clone();
409 }
410 _enable_analytics = config.enable_analytics;
411 }
412 #[cfg(feature = "otel")]
413 if _enable_analytics {
414 let agent_name = self.get_agent_name();
415 let agent_version = self.get_agent_version();
416 match otel::init_telemetry(&agent_name, &agent_version, VERSION, &apihostname, _otel_metric_url.as_str(), &self.stats) {
417 Ok(_) => (),
418 Err(e) => {
419 error!("Failed to initialize telemetry: {}", e);
420 return None;
421 }
422 }
423 }
424 config
425 }
426
427 #[tracing::instrument(skip_all)]
429 pub async fn connect_async(&self, dst: &str) -> Result<(), OpenIAPError> {
430 #[cfg(test)]
431 {
432 enable_tracing("openiap=error", "");
436 }
438 if self.is_connect_called() {
439 self.set_auto_reconnect(true);
440 return self.reconnect().await;
441 }
442 let mut strurl = dst.to_string();
443 if strurl.is_empty() {
444 strurl = std::env::var("apiurl").unwrap_or("".to_string());
445 if strurl.is_empty() {
446 strurl = std::env::var("grpcapiurl").unwrap_or("".to_string());
447 }
448 if strurl.is_empty() {
449 strurl = std::env::var("wsapiurl").unwrap_or("".to_string());
450 }
451 }
452 if strurl.is_empty() {
453 return Err(OpenIAPError::ClientError("No URL provided".to_string()));
454 }
455 let url = url::Url::parse(strurl.as_str())
456 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
457 let usegprc = url.scheme() == "grpc" || url.domain().unwrap_or("localhost").to_lowercase().starts_with("grpc.") || url.port() == Some(50051);
458 if url.scheme() != "http"
459 && url.scheme() != "https"
460 && url.scheme() != "grpc"
461 && url.scheme() != "ws"
462 && url.scheme() != "wss"
463 {
464 return Err(OpenIAPError::ClientError("Invalid URL scheme".to_string()));
465 }
466 if url.scheme() == "grpc" {
467 if url.port() == Some(443) {
468 strurl = format!("https://{}", url.host_str().unwrap_or("app.openiap.io"));
469 } else {
470 strurl = format!("http://{}:{}", url.host_str().unwrap_or("app.openiap.io"), url.port().unwrap_or(80));
471 }
472 }
473 let mut url = url::Url::parse(strurl.as_str())
474 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
475 let mut username = "".to_string();
476 let mut password = "".to_string();
477 if let Some(p) = url.password() {
478 password = p.to_string();
479 }
480 if !url.username().is_empty() {
481 username = url.username().to_string();
482 }
483 if !username.is_empty() && !password.is_empty() {
484 self.set_username(&username);
485 self.set_password(&password);
486 }
487 url = url::Url::parse(strurl.as_str())
488 .map_err(|e| OpenIAPError::ClientError(format!("Failed to parse URL: {}", e)))?;
489
490 if url.port().is_none() {
491 strurl = format!(
492 "{}://{}",
493 url.scheme(),
494 url.host_str().unwrap_or("app.openiap.io")
495 );
496 } else {
497 strurl = format!(
498 "{}://{}:{}",
499 url.scheme(),
500 url.host_str().unwrap_or("localhost.openiap.io"),
501 url.port().unwrap_or(80)
502 );
503 }
504 info!("Connecting to {}", strurl);
505 let config = self.load_config(strurl.as_str(), &url).await;
506 if !usegprc {
507 strurl = format!("{}/ws/v2", strurl);
508
509 let (_stream_tx, stream_rx) = mpsc::channel(60);
510
511 let socket = match tokio_tungstenite::connect_async(strurl.clone()).await {
512 Ok((socket, _)) => socket,
513 Err(e) => {
514 return Err(OpenIAPError::ClientError(format!(
515 "Failed to connect to WS: {}",
516 e
517 )));
518 }
519 };
520 self.set_client(ClientEnum::WS(Arc::new(Mutex::new(socket))));
521 self.set_connect_called(true);
522 self.set_config(config);
523 self.set_url(&strurl);
524 match self.setup_ws(&strurl).await {
525 Ok(_) => (),
526 Err(e) => {
527 return Err(OpenIAPError::ClientError(format!(
528 "Failed to setup WS: {}",
529 e
530 )));
531 }
532 }
533 let client2 = self.clone();
534 tokio::task::spawn(async move {
536 tokio_stream::wrappers::ReceiverStream::new(stream_rx)
537 .for_each(|envelope: Envelope| async {
538 let command = envelope.command.clone();
539 let rid = envelope.rid.clone();
540 let id = envelope.id.clone();
541 trace!("Received command: {}, id: {}, rid: {}", command, id, rid);
542 client2.parse_incomming_envelope(envelope).await;
543 })
544 .await;
545 }); } else {
547 if url.scheme() == "http" {
548 let response = Client::connect_grpc(strurl.clone()).await;
549 match response {
550 Ok(client) => {
551 self.set_client(ClientEnum::Grpc(client));
552 }
553 Err(e) => {
554 return Err(OpenIAPError::ClientError(format!(
555 "Failed to connect: {}",
556 e
557 )));
558 }
559 }
560 } else {
561 let uri = tonic::transport::Uri::builder()
562 .scheme(url.scheme())
563 .authority(url.host().unwrap().to_string())
564 .path_and_query("/")
565 .build();
566 let uri = match uri {
567 Ok(uri) => uri,
568 Err(e) => {
569 return Err(OpenIAPError::ClientError(format!(
570 "Failed to build URI: {}",
571 e
572 )));
573 }
574 };
575 let channel = Channel::builder(uri)
576 .tls_config(tonic::transport::ClientTlsConfig::new().with_native_roots());
577 let channel = match channel {
578 Ok(channel) => channel,
579 Err(e) => {
580 return Err(OpenIAPError::ClientError(format!(
581 "Failed to build channel: {}",
582 e
583 )));
584 }
585 };
586 let channel = channel.connect().await;
587 let channel = match channel {
588 Ok(channel) => channel,
589 Err(e) => {
590 return Err(OpenIAPError::ClientError(format!(
591 "Failed to connect: {}",
592 e
593 )));
594 }
595 };
596 self.set_client(ClientEnum::Grpc(FlowServiceClient::new(channel)));
597 }
598 self.set_connect_called(true);
599 self.set_config(config);
600 self.set_url(&strurl);
601 self.setup_grpc_stream().await?;
602 };
603 self.post_connected().await
604 }
605
606 #[tracing::instrument(skip_all)]
635 pub async fn new_connect(dst: &str) -> Result<Self, OpenIAPError> {
636 #[cfg(test)]
637 {
638 enable_tracing("openiap=error", "");
642 }
644 let client = Client::new();
645 client.connect_async(dst).await?;
646 Ok(client)
647 }
648 pub async fn post_connected(&self) -> Result<(), OpenIAPError> {
650 if self.get_username().is_empty() && self.get_password().is_empty() {
651 self.set_username(&std::env::var("OPENIAP_USERNAME").unwrap_or_default());
652 self.set_password(&std::env::var("OPENIAP_PASSWORD").unwrap_or_default());
653 }
654 if !self.get_username().is_empty() && !self.get_password().is_empty() {
655 debug!("Signing in with username: {}", self.get_username());
656 let signin = SigninRequest::with_userpass(self.get_username().as_str(), self.get_password().as_str());
657 let loginresponse = self.signin(signin).await;
658 match loginresponse {
659 Ok(response) => {
660 self.reset_reconnect_ms();
661 self.set_connected(ClientState::Connected, None);
662 info!("Signed in as {}", response.user.as_ref().unwrap().username);
663 Ok(())
664 }
665 Err(_e) => {
666 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
667 Err(_e)
668 }
669 }
670 } else {
671 self.set_jwt(&std::env::var("OPENIAP_JWT").unwrap_or_default());
672 if self.get_jwt().is_empty() {
673 self.set_jwt(&std::env::var("jwt").unwrap_or_default());
674 }
675 if !self.get_jwt().is_empty() {
676 debug!("Signing in with JWT");
677 let signin = SigninRequest::with_jwt(self.get_jwt().as_str());
678 let loginresponse = self.signin(signin).await;
679 match loginresponse {
680 Ok(response) => match response.user {
681 Some(user) => {
682 self.reset_reconnect_ms();
683 info!("Signed in as {}", user.username);
684 self.set_connected(ClientState::Connected, None);
685 Ok(())
686 }
687 None => {
688 self.reset_reconnect_ms();
689 info!("Signed in as guest");
690 self.set_connected(ClientState::Connected, None);
691 Ok(())
692 }
694 },
695 Err(_e) => {
696 self.set_connected(ClientState::Disconnected, Some(&_e.to_string()));
697 Err(_e)
698 }
699 }
700 } else {
701 self.reset_reconnect_ms();
702 match self.get_element().await {
703 Ok(_) => {
704 debug!("Connected, No credentials provided so is running as guest");
705 self.set_connected(ClientState::Connected, None);
706 Ok(())
707 },
708 Err(e) => {
709 self.set_connected(ClientState::Disconnected, Some(&e.to_string()));
710 Err(e)
711 }
712 }
713 }
714 }
715 }
716 #[tracing::instrument(skip_all)]
718 pub async fn reconnect(&self) -> Result<(), OpenIAPError> {
719 let state = self.get_state();
720 if state == ClientState::Connected || state == ClientState::Signedin {
721 return Ok(());
722 }
723 if !self.is_auto_reconnect() {
724 return Ok(());
725 }
726 let client = self.get_client();
727
728 match client {
729 ClientEnum::WS(ref _client) => {
730 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
731 self.setup_ws(&self.get_url()).await?;
732 debug!("Completed reconnecting to websocket");
733 self.post_connected().await
734 }
735 ClientEnum::Grpc(ref _client) => {
736 info!("Reconnecting to {} ({} ms)", self.get_url(), (self.get_reconnect_ms() - 500));
737 match self.setup_grpc_stream().await {
738 Ok(_) => {
739 debug!("Completed reconnecting to gRPC");
740 self.post_connected().await
741 },
742 Err(e) => {
743 return Err(OpenIAPError::ClientError(format!(
744 "Failed to setup gRPC stream: {}",
745 e
746 )));
747 }
748 }
749 }
750 ClientEnum::None => {
751 return Err(OpenIAPError::ClientError("Invalid client".to_string()));
752 }
753 }
754 }
755 pub fn disconnect(&self) {
757 self.set_auto_reconnect(false);
758 self.set_connected(ClientState::Disconnected, Some("Disconnected"));
759 }
760 pub fn set_connected(&self, state: ClientState, message: Option<&str>) {
762 {
763 let current = self.get_state();
764 trace!("Set connected: {:?} from {:?}", state, current);
765 if state == ClientState::Connected && current == ClientState::Signedin {
766 self.set_state(ClientState::Signedin);
767 } else {
768 self.set_state(state.clone());
769 }
770 if state == ClientState::Connecting && !current.eq(&state) {
771 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
772 self.stats.lock().unwrap().connection_attempts += 1;
773 let me = self.clone();
774 tokio::task::spawn(async move {
775 me.event_sender.send(crate::ClientEvent::Connecting).await.unwrap();
776 });
777 }
786
787 }
788 if (state == ClientState::Connected|| state == ClientState::Signedin) && (current == ClientState::Disconnected || current == ClientState::Connecting) {
789 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
790 self.stats.lock().unwrap().connections += 1;
791 let me = self.clone();
792 tokio::task::spawn(async move {
793 me.event_sender.send(crate::ClientEvent::Connected).await.unwrap();
794 });
795 }
804 }
805 if state == ClientState::Signedin && current != ClientState::Signedin {
806 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
807 let me = self.clone();
808 tokio::task::spawn(async move {
809 me.event_sender.send(crate::ClientEvent::SignedIn).await.unwrap();
810 });
811 }
820 }
821 if state == ClientState::Disconnected && !current.eq(&state) {
822 if message.is_some() {
823 info!("Disconnected: {}", message.unwrap());
824 } else {
825 info!("Disconnected");
826 }
827 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
828 let me = self.clone();
829 let message = match message {
830 Some(message) => message.to_string(),
831 None => "".to_string(),
832 };
833 tokio::task::spawn(async move {
835 me.event_sender.send(crate::ClientEvent::Disconnected(message)).await.unwrap();
836 });
837 }
848
849 self.kill_handles();
850 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
851 let client = self.clone();
852 tokio::task::spawn(async move {
854 {
855 let inner = client.inner.lock().await;
856 let mut queries = inner.queries.lock().await;
857 let ids = queries.keys().cloned().collect::<Vec<String>>();
858 debug!("********************************************** Cleaning up");
859 for id in ids {
860 let err = ErrorResponse {
861 code: 500,
862 message: "Disconnected".to_string(),
863 stack: "".to_string(),
864 };
865 let envelope = err.to_envelope();
866 let tx = queries.remove(&id).unwrap();
867 debug!("kill query: {}", id);
868 let _ = tx.send(envelope);
869 }
870 let mut streams = inner.streams.lock().await;
871 let ids = streams.keys().cloned().collect::<Vec<String>>();
872 for id in ids {
873 let tx = streams.remove(&id).unwrap();
874 debug!("kill stream: {}", id);
875 let _ = tx.send(Vec::new()).await;
876 }
877 let mut queues = inner.queues.lock().await;
878 let ids = queues.keys().cloned().collect::<Vec<String>>();
879 for id in ids {
880 let _ = queues.remove(&id).unwrap();
881 }
882 let mut watches = inner.watches.lock().await;
883 let ids = watches.keys().cloned().collect::<Vec<String>>();
884 for id in ids {
885 let _ = watches.remove(&id).unwrap();
886 }
887 debug!("**********************************************************");
888 }
889 if client.is_auto_reconnect() {
890 trace!("Reconnecting in {} seconds", client.get_reconnect_ms() / 1000);
891 tokio::time::sleep(Duration::from_millis(client.get_reconnect_ms() as u64)).await;
892 if client.is_auto_reconnect() {
893 client.inc_reconnect_ms();
894 trace!("Reconnecting . . .");
896 client.reconnect().await.unwrap_or_else(|e| {
897 error!("Failed to reconnect: {}", e);
898 client.set_connected(ClientState::Disconnected, Some(&e.to_string()));
899 });
900 } else {
901 debug!("Not reconnecting");
902 }
903 } else {
904 debug!("Reconnecting disabled, stop now");
905 }
906 });
907 }
914
915 }
916 }
917 }
918 pub fn get_state(&self) -> ClientState {
920 let conn = self.state.lock().unwrap();
921 conn.clone()
922 }
923 pub fn set_state(&self, state: ClientState) {
925 let mut conn = self.state.lock().unwrap();
926 *conn = state;
927 }
928 pub fn set_msgcount(&self, msgcount: i32) {
930 let mut current = self.msgcount.lock().unwrap();
931 trace!("Set msgcount: {} from {}", msgcount, *current);
932 *current = msgcount;
933 }
934 pub fn inc_msgcount(&self) -> i32 {
936 let mut current = self.msgcount.lock().unwrap();
937 *current += 1;
938 *current
939 }
940 pub fn get_reconnect_ms(&self) -> i32 {
942 let reconnect_ms = self.reconnect_ms.lock().unwrap();
943 *reconnect_ms
944 }
945 pub fn reset_reconnect_ms(&self) {
947 let mut current = self.reconnect_ms.lock().unwrap();
948 *current = 500;
949 }
950 pub fn inc_reconnect_ms(&self) -> i32 {
952 let mut current = self.reconnect_ms.lock().unwrap();
953 if *current < 30000 {
954 *current += 500;
955 }
956 *current
957 }
958
959 pub fn push_handle(&self, handle: tokio::task::JoinHandle<()>) {
961 let mut handles = self.task_handles.lock().unwrap();
962 handles.push(handle);
963 }
964 pub fn kill_handles(&self) {
966 let mut handles = self.task_handles.lock().unwrap();
967 for handle in handles.iter() {
968 debug!("Killing handle");
971 if !handle.is_finished() {
972 handle.abort();
973 }
974 }
975 handles.clear();
976 }
983
984
985 #[tracing::instrument(skip_all)]
987 fn get_msgcount(&self) -> i32 {
988 let msgcount = self.msgcount.lock().unwrap();
989 *msgcount
990 }
991
992 #[tracing::instrument(skip_all)]
994 pub fn set_connect_called(&self, connect_called: bool) {
995 let mut current = self.connect_called.lock().unwrap();
996 trace!("Set connect_called: {} from {}", connect_called, *current);
997 *current = connect_called;
998 }
999 #[tracing::instrument(skip_all)]
1001 fn is_connect_called(&self) -> bool {
1002 let connect_called = self.connect_called.lock().unwrap();
1003 *connect_called
1004 }
1005 #[tracing::instrument(skip_all)]
1007 pub fn set_auto_reconnect(&self, auto_reconnect: bool) {
1008 let mut current = self.auto_reconnect.lock().unwrap();
1009 trace!("Set auto_reconnect: {} from {}", auto_reconnect, *current);
1010 *current = auto_reconnect;
1011 }
1012 #[tracing::instrument(skip_all)]
1014 fn is_auto_reconnect(&self) -> bool {
1015 let auto_reconnect = self.auto_reconnect.lock().unwrap();
1016 *auto_reconnect
1017 }
1018 #[tracing::instrument(skip_all)]
1020 pub fn set_url(&self, url: &str) {
1021 let mut current = self.url.lock().unwrap();
1022 trace!("Set url: {} from {}", url, *current);
1023 *current = url.to_string();
1024 }
1025 #[tracing::instrument(skip_all)]
1027 fn get_url(&self) -> String {
1028 let url = self.url.lock().unwrap();
1029 url.to_string()
1030 }
1031 #[tracing::instrument(skip_all)]
1033 pub fn set_username(&self, username: &str) {
1034 let mut current = self.username.lock().unwrap();
1035 trace!("Set username: {} from {}", username, *current);
1036 *current = username.to_string();
1037 }
1038 #[tracing::instrument(skip_all)]
1040 fn get_username(&self) -> String {
1041 let username = self.username.lock().unwrap();
1042 username.to_string()
1043 }
1044 #[tracing::instrument(skip_all)]
1046 pub fn set_password(&self, password: &str) {
1047 let mut current = self.password.lock().unwrap();
1048 trace!("Set password: {} from {}", password, *current);
1049 *current = password.to_string();
1050 }
1051 #[tracing::instrument(skip_all)]
1053 fn get_password(&self) -> String {
1054 let password = self.password.lock().unwrap();
1055 password.to_string()
1056 }
1057 #[tracing::instrument(skip_all)]
1059 pub fn set_jwt(&self, jwt: &str) {
1060 let mut current = self.jwt.lock().unwrap();
1061 trace!("Set jwt: {} from {}", jwt, *current);
1062 *current = jwt.to_string();
1063 }
1064 #[tracing::instrument(skip_all)]
1066 fn get_jwt(&self) -> String {
1067 let jwt = self.jwt.lock().unwrap();
1068 jwt.to_string()
1069 }
1070 #[tracing::instrument(skip_all)]
1072 pub fn set_agent_name(&self, agent: &str) {
1073 let mut current = self.agent_name.lock().unwrap();
1074 trace!("Set agent: {} from {}", agent, *current);
1075 *current = agent.to_string();
1076 }
1077 #[tracing::instrument(skip_all)]
1079 pub fn get_agent_name(&self) -> String {
1080 let agent = self.agent_name.lock().unwrap();
1081 agent.to_string()
1082 }
1083 #[tracing::instrument(skip_all)]
1085 pub fn set_agent_version(&self, version: &str) {
1086 let mut current = self.agent_version.lock().unwrap();
1087 trace!("Set agent version: {} from {}", version, *current);
1088 *current = version.to_string();
1089 }
1090 #[tracing::instrument(skip_all)]
1092 pub fn get_agent_version(&self) -> String {
1093 let agent_version = self.agent_version.lock().unwrap();
1094 agent_version.to_string()
1095 }
1096
1097 #[tracing::instrument(skip_all)]
1099 pub fn set_config(&self, config: Option<Config>) {
1100 let mut current = self.config.lock().unwrap();
1101 *current = config;
1102 }
1103 #[tracing::instrument(skip_all)]
1105 pub fn get_config(&self) -> Option<Config> {
1106 let config = self.config.lock().unwrap();
1107 config.clone()
1108 }
1109 #[tracing::instrument(skip_all)]
1111 pub fn set_client(&self, client: ClientEnum) {
1112 let mut current = self.client.lock().unwrap();
1113 *current = client;
1114 }
1115 #[tracing::instrument(skip_all)]
1117 fn get_client(&self) -> ClientEnum {
1118 let client = self.client.lock().unwrap();
1119 client.clone()
1120 }
1121 #[tracing::instrument(skip_all)]
1123 pub fn set_user(&self, user: Option<User>) {
1124 let mut current = self.user.lock().unwrap();
1125 *current = user;
1126 }
1127 #[tracing::instrument(skip_all)]
1129 pub fn get_user(&self) -> Option<User> {
1130 let user = self.user.lock().unwrap();
1131 user.clone()
1132 }
1133 #[tracing::instrument(skip_all)]
1143 pub fn set_runtime(&self, runtime: Option<tokio::runtime::Runtime>) {
1144 let mut current = self.runtime.lock().unwrap();
1145 *current = runtime;
1146 }
1147 #[tracing::instrument(skip_all)]
1149 pub fn get_runtime(&self) -> &std::sync::Mutex<std::option::Option<tokio::runtime::Runtime>> {
1151 self.runtime.as_ref()
1152 }
1153 #[tracing::instrument(skip_all)]
1155 pub fn get_runtime_handle(&self) -> tokio::runtime::Handle {
1156 let mut rt = self.runtime.lock().unwrap();
1157 if rt.is_none() {
1158 let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
1160 *rt = Some(runtime);
1161 } else {
1162 }
1164 rt.as_ref().unwrap().handle().clone()
1165 }
1166 #[tracing::instrument(skip_all)]
1168 pub async fn on_event(&self, callback: Box<dyn Fn(ClientEvent) + Send + Sync>)
1169 {
1170 let event_receiver = self.event_receiver.clone();
1172 let callback = callback;
1173 let _handle = tokio::task::spawn(async move {
1174 while let Ok(event) = event_receiver.recv().await {
1175 callback(event);
1176 }
1177 }); }
1179 #[tracing::instrument(skip_all)]
1181 pub fn get_uniqueid() -> String {
1182 static COUNTER: AtomicUsize = AtomicUsize::new(1);
1183 let num1 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1184 let num2 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1185 let num3 = COUNTER.fetch_add(1, Ordering::Relaxed) as u64;
1186 let sqids = Sqids::default();
1187 sqids.encode(&[num1, num2, num3 ]).unwrap().to_string()
1188 }
1189 #[tracing::instrument(skip_all)]
1191 async fn send(&self, msg: Envelope) -> Result<Envelope, OpenIAPError> {
1192 let response = self.send_noawait(msg).await;
1193 match response {
1194 Ok((response_rx, id)) => {
1195 let response = response_rx.await;
1197
1198 let inner = self.inner.lock().await;
1200 inner.queries.lock().await.remove(&id);
1201
1202 match response {
1204 Ok(response) => Ok(response),
1205 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1206 }
1207 }
1208 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
1209 }
1210 }
1211 #[tracing::instrument(skip_all)]
1214 async fn send_noawait(
1215 &self,
1216 mut msg: Envelope,
1217 ) -> Result<(oneshot::Receiver<Envelope>, String), OpenIAPError> {
1218 let (response_tx, response_rx) = oneshot::channel();
1219 let id = Client::get_uniqueid();
1220 msg.id = id.clone();
1221
1222 {
1224 let inner = self.inner.lock().await;
1225 inner.queries.lock().await.insert(id.clone(), response_tx);
1226 }
1227
1228 let res = self.send_envelope(msg).await;
1230 if let Err(e) = res {
1231 let inner = self.inner.lock().await;
1233 inner.queries.lock().await.remove(&id);
1234 return Err(OpenIAPError::ClientError(e.to_string()));
1235 }
1236
1237 Ok((response_rx, id))
1238 }
1239 #[tracing::instrument(skip_all)]
1241 async fn sendwithstream(
1242 &self,
1243 mut msg: Envelope,
1244 ) -> Result<(oneshot::Receiver<Envelope>, mpsc::Receiver<Vec<u8>>), OpenIAPError> {
1245 let (response_tx, response_rx) = oneshot::channel();
1246 let (stream_tx, stream_rx) = mpsc::channel(1024 * 1024);
1247 let id = Client::get_uniqueid();
1248 msg.id = id.clone();
1249 {
1250 let inner = self.inner.lock().await;
1251 inner.queries.lock().await.insert(id.clone(), response_tx);
1252 inner.streams.lock().await.insert(id.clone(), stream_tx);
1253 let res = self.send_envelope(msg).await;
1254 match res {
1255 Ok(_) => (),
1256 Err(e) => return Err(OpenIAPError::ClientError(e.to_string())),
1257 }
1258 }
1259 Ok((response_rx, stream_rx))
1260 }
1261 #[tracing::instrument(skip_all, target = "openiap::client")]
1262 async fn send_envelope(&self, mut envelope: Envelope) -> Result<(), OpenIAPError> {
1263 if (self.get_state() != ClientState::Connected && self.get_state() != ClientState::Signedin )
1264 && envelope.command != "signin" && envelope.command != "getelement" && envelope.command != "pong" {
1265 return Err(OpenIAPError::ClientError(format!("Not connected ( {:?} )", self.get_state())));
1266 }
1267 let env = envelope.clone();
1268 let command = envelope.command.clone();
1269 self.stats.lock().unwrap().package_tx += 1;
1270 match command.as_str() {
1271 "signin" => { self.stats.lock().unwrap().signin += 1;},
1272 "upload" => { self.stats.lock().unwrap().upload += 1;},
1273 "download" => { self.stats.lock().unwrap().download += 1;},
1274 "getdocumentversion" => { self.stats.lock().unwrap().getdocumentversion += 1;},
1275 "customcommand" => { self.stats.lock().unwrap().customcommand += 1;},
1276 "listcollections" => { self.stats.lock().unwrap().listcollections += 1;},
1277 "createcollection" => { self.stats.lock().unwrap().createcollection += 1;},
1278 "dropcollection" => { self.stats.lock().unwrap().dropcollection += 1;},
1279 "ensurecustomer" => { self.stats.lock().unwrap().ensurecustomer += 1;},
1280 "invokeopenrpa" => { self.stats.lock().unwrap().invokeopenrpa += 1;},
1281
1282 "registerqueue" => { self.stats.lock().unwrap().registerqueue += 1;},
1283 "registerexchange" => { self.stats.lock().unwrap().registerexchange += 1;},
1284 "unregisterqueue" => { self.stats.lock().unwrap().unregisterqueue += 1;},
1285 "watch" => { self.stats.lock().unwrap().watch += 1;},
1286 "unwatch" => { self.stats.lock().unwrap().unwatch += 1;},
1287 "queuemessage" => { self.stats.lock().unwrap().queuemessage += 1;},
1288
1289 "pushworkitem" => { self.stats.lock().unwrap().pushworkitem += 1;},
1290 "pushworkitems" => { self.stats.lock().unwrap().pushworkitems += 1;},
1291 "popworkitem" => { self.stats.lock().unwrap().popworkitem += 1;},
1292 "updateworkitem" => { self.stats.lock().unwrap().updateworkitem += 1;},
1293 "deleteworkitem" => { self.stats.lock().unwrap().deleteworkitem += 1;},
1294 "addworkitemqueue" => { self.stats.lock().unwrap().addworkitemqueue += 1;},
1295 "updateworkitemqueue" => { self.stats.lock().unwrap().updateworkitemqueue += 1;},
1296 "deleteworkitemqueue" => { self.stats.lock().unwrap().deleteworkitemqueue += 1;},
1297
1298 "getindexes" => { self.stats.lock().unwrap().getindexes += 1;},
1299 "createindex" => { self.stats.lock().unwrap().createindex += 1;},
1300 "dropindex" => { self.stats.lock().unwrap().dropindex += 1;},
1301 "query" => { self.stats.lock().unwrap().query += 1;},
1302 "count" => { self.stats.lock().unwrap().count += 1;},
1303 "distinct" => { self.stats.lock().unwrap().distinct += 1;},
1304 "aggregate" => { self.stats.lock().unwrap().aggregate += 1;},
1305 "insertone" => { self.stats.lock().unwrap().insertone += 1;},
1306 "insertmany" => { self.stats.lock().unwrap().insertmany += 1;},
1307 "updateone" => { self.stats.lock().unwrap().updateone += 1;},
1308 "insertorupdateone" => { self.stats.lock().unwrap().insertorupdateone += 1;},
1309 "insertorupdatemany" => { self.stats.lock().unwrap().insertorupdatemany += 1;},
1310 "updatedocument" => { self.stats.lock().unwrap().updatedocument += 1;},
1311 "deleteone" => { self.stats.lock().unwrap().deleteone += 1;},
1312 "deletemany" => { self.stats.lock().unwrap().deletemany += 1;},
1313 _ => {}
1314 };
1315 if envelope.id.is_empty() {
1316 let id = Client::get_uniqueid();
1317 envelope.id = id.clone();
1318 }
1319 trace!("Sending {} message, in the thread", command);
1320 let res = self.out_envelope_sender.send(env).await;
1321 if res.is_err() {
1322 error!("{:?}", res);
1323 let errmsg = res.unwrap_err().to_string();
1324 self.set_connected(ClientState::Disconnected, Some(&errmsg));
1325 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", errmsg)))
1326 } else {
1327 return Ok(())
1328 }
1329 }
1330 #[tracing::instrument(skip_all, target = "openiap::client")]
1331 async fn parse_incomming_envelope(&self, received: Envelope) {
1332 self.stats.lock().unwrap().package_rx += 1;
1333 let command = received.command.clone();
1334 trace!("parse_incomming_envelope, command: {}", command);
1335 let inner = self.inner.lock().await;
1336 let rid = received.rid.clone();
1337 let mut queries = inner.queries.lock().await;
1338 let mut streams = inner.streams.lock().await;
1339 let watches = inner.watches.lock().await;
1340 let queues = inner.queues.lock().await;
1341
1342 if command != "ping" && command != "pong" && command != "refreshtoken" {
1343 if rid.is_empty() {
1344 debug!("Received #{} #{} {} message", received.seq, received.id, command);
1345 } else {
1346 debug!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1347 }
1348 } else if rid.is_empty() {
1349 trace!("Received #{} #{} {} message", received.seq, received.id, command);
1350 } else {
1351 trace!("Received #{} #{} (reply to #{}) {} message", received.seq, received.id, rid, command);
1352 }
1353
1354 if command == "ping" {
1355 self.pong(&received.id).await;
1356 } else if command == "refreshtoken" {
1358 } else if command == "beginstream"
1360 || command == "stream"
1361 || command == "endstream"
1362 {
1363 let streamresponse: Stream =
1364 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1365 let streamdata = streamresponse.data;
1366 if !streamdata.is_empty() {
1367 let stream = streams.get(rid.as_str()).unwrap();
1368
1369 match stream.send(streamdata).await {
1370 Ok(_) => _ = (),
1371 Err(e) => error!("Failed to send data: {}", e),
1372 }
1373 }
1374 if command == "endstream" {
1375 let _ = streams.remove(rid.as_str());
1376 }
1377 } else if command == "watchevent" {
1378 let watchevent: WatchEvent =
1379 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1380 if let Some(callback) = watches.get(watchevent.id.as_str()) {
1381 callback(watchevent);
1382 }
1383 } else if command == "queueevent" {
1384 let queueevent: QueueEvent =
1385 prost::Message::decode(received.data.unwrap().value.as_ref()).unwrap();
1386 if let Some(callback) = queues.get(queueevent.queuename.as_str()) {
1387 callback(queueevent);
1388 }
1389 } else if let Some(response_tx) = queries.remove(&rid) {
1390 let stream = streams.get(rid.as_str());
1391 if let Some(stream) = stream {
1392 let streamdata = vec![];
1393 match stream.send(streamdata).await {
1394 Ok(_) => _ = (),
1395 Err(e) => error!("Failed to send data: {}", e),
1396 }
1397 }
1398 let _ = response_tx.send(received);
1399 } else {
1400 error!("Received unhandled {} message: {:?}", command, received);
1401 }
1402 }
1403 #[tracing::instrument(skip_all)]
1405 async fn get_element(&self) -> Result<(), OpenIAPError> {
1406 let id = Client::get_uniqueid();
1407 let envelope = Envelope {
1408 id: id.clone(),
1409 command: "getelement".into(),
1410 ..Default::default()
1411 };
1412 let result = match self.send(envelope).await {
1413 Ok(res) => res,
1414 Err(e) => {
1415 return Err(e);
1416 },
1417 };
1418 if result.command == "pong" || result.command == "getelement" {
1419 Ok(())
1420 } else if result.command == "error" {
1421 let e: ErrorResponse = prost::Message::decode(result.data.unwrap().value.as_ref()).unwrap();
1422 Err(OpenIAPError::ServerError(e.message))
1423 } else {
1424 Err(OpenIAPError::ClientError("Failed to receive getelement".to_string()))
1425 }
1426 }
1427 #[tracing::instrument(skip_all)]
1429 async fn ping(&self) -> Result<(), OpenIAPError> {
1430 let id = Client::get_uniqueid();
1431 let envelope = Envelope {
1432 id: id.clone(),
1433 command: "getelement".into(),
1434 ..Default::default()
1435 };
1436 match self.send_envelope(envelope).await {
1437 Ok(_res) => Ok(()),
1438 Err(e) => {
1439 return Err(e);
1440 },
1441 }
1442 }
1443 #[tracing::instrument(skip_all)]
1445 async fn pong(&self, rid: &str) {
1446 let id = Client::get_uniqueid();
1447 let envelope = Envelope {
1448 id: id.clone(),
1449 command: "pong".into(),
1450 rid: rid.to_string(),
1451 ..Default::default()
1452 };
1453 match self.send_envelope(envelope).await {
1454 Ok(_) => (),
1455 Err(e) => error!("Failed to send pong: {}", e),
1456 }
1457 }
1458 #[tracing::instrument(skip_all)]
1464 pub async fn signin(&self, mut config: SigninRequest) -> Result<SigninResponse, OpenIAPError> {
1465 if config.username.is_empty() && config.password.is_empty() && config.jwt.is_empty() {
1467 if config.jwt.is_empty() {
1468 config.jwt = std::env::var("OPENIAP_JWT").unwrap_or_default();
1469 }
1470 if config.jwt.is_empty() {
1471 config.jwt = std::env::var("jwt").unwrap_or_default();
1472 }
1473 if config.jwt.is_empty() {
1475 if config.username.is_empty() {
1476 config.username = std::env::var("OPENIAP_USERNAME").unwrap_or_default();
1477 }
1478 if config.password.is_empty() {
1479 config.password = std::env::var("OPENIAP_PASSWORD").unwrap_or_default();
1480 }
1481 }
1482 }
1483 let version = env!("CARGO_PKG_VERSION");
1484 if !version.is_empty() && config.version.is_empty() {
1485 config.version = version.to_string();
1486 }
1487 if config.agent.is_empty() {
1488 config.agent = self.get_agent_name();
1489 }
1490
1491 debug!("Attempting sign-in using {:?}", config);
1492 let envelope = config.to_envelope();
1493 let result = self.send(envelope).await;
1494
1495 match &result {
1496 Ok(m) => {
1497 debug!("Sign-in reply received");
1498 if m.command == "error" {
1499 let e: ErrorResponse =
1500 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1501 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1502 return Err(OpenIAPError::ServerError(e.message));
1503 }
1504 debug!("Sign-in successful");
1505 let response: SigninResponse =
1506 prost::Message::decode(m.data.as_ref().unwrap().value.as_ref())
1507 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1508 if !config.validateonly {
1509 self.set_connected(ClientState::Signedin, None);
1510 self.set_user(Some(response.user.as_ref().unwrap().clone()));
1511 }
1512 Ok(response)
1513 }
1514 Err(e) => {
1515 debug!("Sending Sign-in request failed {:?}", result);
1516 debug!("Sign-in failed: {}", e.to_string());
1517 if !config.validateonly {
1518 self.set_user(None);
1519 }
1520 Err(OpenIAPError::ClientError(e.to_string()))
1521 }
1522 }
1523 }
1524 #[tracing::instrument(skip_all)]
1528 pub async fn list_collections(&self, includehist: bool) -> Result<String, OpenIAPError> {
1529 let config = ListCollectionsRequest::new(includehist);
1530 let envelope = config.to_envelope();
1531 let result = self.send(envelope).await;
1532 match result {
1533 Ok(m) => {
1534 let data = match m.data {
1535 Some(data) => data,
1536 None => {
1537 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1538 }
1539 };
1540 if m.command == "error" {
1541 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1542 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1543 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1544 }
1545 let response: ListCollectionsResponse = prost::Message::decode(data.value.as_ref())
1546 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1547 Ok(response.results)
1548 }
1549 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1550 }
1551 }
1552 #[tracing::instrument(skip_all)]
1604 pub async fn create_collection(
1605 &self,
1606 config: CreateCollectionRequest,
1607 ) -> Result<(), OpenIAPError> {
1608 if config.collectionname.is_empty() {
1609 return Err(OpenIAPError::ClientError(
1610 "No collection name provided".to_string(),
1611 ));
1612 }
1613 let envelope = config.to_envelope();
1614 let result = self.send(envelope).await;
1615 match result {
1616 Ok(m) => {
1617 let data = match m.data {
1618 Some(data) => data,
1619 None => {
1620 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1621 }
1622 };
1623 if m.command == "error" {
1624 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1625 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1626 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1627 }
1628 Ok(())
1629 }
1630 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1631 }
1632 }
1633 #[tracing::instrument(skip_all)]
1636 pub async fn drop_collection(&self, config: DropCollectionRequest) -> Result<(), OpenIAPError> {
1637 if config.collectionname.is_empty() {
1638 return Err(OpenIAPError::ClientError(
1639 "No collection name provided".to_string(),
1640 ));
1641 }
1642 let envelope = config.to_envelope();
1643 let result = self.send(envelope).await;
1644 match result {
1645 Ok(m) => {
1646 let data = match m.data {
1647 Some(data) => data,
1648 None => {
1649 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1650 }
1651 };
1652 if m.command == "error" {
1653 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1654 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1655 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1656 }
1657 Ok(())
1658 }
1659 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1660 }
1661 }
1662 pub async fn get_indexes(&self, config: GetIndexesRequest) -> Result<String, OpenIAPError> {
1676 if config.collectionname.is_empty() {
1677 return Err(OpenIAPError::ClientError(
1678 "No collection name provided".to_string(),
1679 ));
1680 }
1681 let envelope = config.to_envelope();
1682 let result = self.send(envelope).await;
1683 match result {
1684 Ok(m) => {
1685 let data = match m.data {
1686 Some(data) => data,
1687 None => {
1688 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1689 }
1690 };
1691 if m.command == "error" {
1692 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1693 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1694 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1695 }
1696 let response: GetIndexesResponse = prost::Message::decode(data.value.as_ref())
1697 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1698 Ok(response.results)
1699 }
1700 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1701 }
1702 }
1703 pub async fn create_index(&self, config: CreateIndexRequest) -> Result<(), OpenIAPError> {
1744 if config.collectionname.is_empty() {
1745 return Err(OpenIAPError::ClientError(
1746 "No collection name provided".to_string(),
1747 ));
1748 }
1749 if config.index.is_empty() {
1750 return Err(OpenIAPError::ClientError(
1751 "No index was provided".to_string(),
1752 ));
1753 }
1754 let envelope = config.to_envelope();
1755 let result = self.send(envelope).await;
1756 match result {
1757 Ok(m) => {
1758 let data = match m.data {
1759 Some(data) => data,
1760 None => {
1761 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1762 }
1763 };
1764 if m.command == "error" {
1765 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1766 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1767 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1768 }
1769 Ok(())
1770 }
1771 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1772 }
1773 }
1774 pub async fn drop_index(&self, config: DropIndexRequest) -> Result<(), OpenIAPError> {
1777 if config.collectionname.is_empty() {
1778 return Err(OpenIAPError::ClientError(
1779 "No collection name provided".to_string(),
1780 ));
1781 }
1782 if config.name.is_empty() {
1783 return Err(OpenIAPError::ClientError(
1784 "No index name provided".to_string(),
1785 ));
1786 }
1787 let envelope = config.to_envelope();
1788 let result = self.send(envelope).await;
1789 match result {
1790 Ok(m) => {
1791 let data = match m.data {
1792 Some(data) => data,
1793 None => {
1794 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1795 }
1796 };
1797 if m.command == "error" {
1798 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1799 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1800 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1801 }
1802 Ok(())
1803 }
1804 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
1805 }
1806 }
1807 #[tracing::instrument(skip_all)]
1845 pub async fn query(&self, mut config: QueryRequest) -> Result<QueryResponse, OpenIAPError> {
1846 if config.collectionname.is_empty() {
1847 config.collectionname = "entities".to_string();
1848 }
1849
1850 let envelope = config.to_envelope();
1851 let result = self.send(envelope).await;
1852 match result {
1853 Ok(m) => {
1854 let data = match m.data {
1855 Some(data) => data,
1856 None => {
1857 return Err(OpenIAPError::ClientError("No data returned".to_string()));
1858 }
1859 };
1860 if m.command == "error" {
1861 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
1862 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1863 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
1864 }
1865 let response: QueryResponse = prost::Message::decode(data.value.as_ref())
1866 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
1867 debug!("Return Ok(response)");
1868 Ok(response)
1869 }
1870 Err(e) => {
1871 debug!("Error !!");
1872 Err(OpenIAPError::ClientError(e.to_string()))
1873 }
1874 }
1875 }
1876 #[tracing::instrument(skip_all)]
1902 pub async fn get_one(&self, mut config: QueryRequest) -> Option<serde_json::Value> {
1903 if config.collectionname.is_empty() {
1904 config.collectionname = "entities".to_string();
1905 }
1906 config.top = 1;
1907 let envelope = config.to_envelope();
1908 let result = self.send(envelope).await;
1909 match result {
1910 Ok(m) => {
1911 let data = match m.data {
1912 Some(data) => data,
1913 None => return None,
1914 };
1915 if m.command == "error" {
1916 return None;
1917 }
1918 let response: QueryResponse = prost::Message::decode(data.value.as_ref()).ok()?;
1919
1920 let items: serde_json::Value = serde_json::from_str(&response.results).unwrap();
1921 let items: &Vec<serde_json::Value> = items.as_array().unwrap();
1922 if items.is_empty() {
1923 return None;
1924 }
1925 let item = items[0].clone();
1926 Some(item)
1927 }
1928 Err(_) => None,
1929 }
1930 }
1931
1932 #[tracing::instrument(skip_all)]
2042 pub async fn get_document_version(
2043 &self,
2044 mut config: GetDocumentVersionRequest,
2045 ) -> Result<String, OpenIAPError> {
2046 if config.collectionname.is_empty() {
2047 config.collectionname = "entities".to_string();
2048 }
2049 if config.id.is_empty() {
2050 return Err(OpenIAPError::ClientError("No id provided".to_string()));
2051 }
2052 let envelope = config.to_envelope();
2053 let result = self.send(envelope).await;
2054 match result {
2055 Ok(m) => {
2056 let data = match m.data {
2057 Some(data) => data,
2058 None => {
2059 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2060 }
2061 };
2062 if m.command == "error" {
2063 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2064 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2065 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2066 }
2067 let response: GetDocumentVersionResponse =
2068 prost::Message::decode(data.value.as_ref())
2069 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2070 Ok(response.result)
2071 }
2072 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2073 }
2074 }
2075 #[tracing::instrument(skip_all)]
2094 pub async fn aggregate(
2095 &self,
2096 mut config: AggregateRequest,
2097 ) -> Result<AggregateResponse, OpenIAPError> {
2098 if config.collectionname.is_empty() {
2099 config.collectionname = "entities".to_string();
2100 }
2101 if config.hint.is_empty() {
2102 config.hint = "".to_string();
2103 }
2104 if config.queryas.is_empty() {
2105 config.queryas = "".to_string();
2106 }
2107 if config.aggregates.is_empty() {
2108 return Err(OpenIAPError::ClientError(
2109 "No aggregates provided".to_string(),
2110 ));
2111 }
2112 let envelope = config.to_envelope();
2113 let result = self.send(envelope).await;
2114 match result {
2115 Ok(m) => {
2116 let data = match m.data {
2117 Some(data) => data,
2118 None => {
2119 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2120 }
2121 };
2122 if m.command == "error" {
2123 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2124 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2125 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2126 }
2127 let response: AggregateResponse = prost::Message::decode(data.value.as_ref())
2128 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2129 Ok(response)
2130 }
2131 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2132 }
2133 }
2134 #[tracing::instrument(skip_all)]
2136 pub async fn count(&self, mut config: CountRequest) -> Result<CountResponse, OpenIAPError> {
2137 if config.collectionname.is_empty() {
2138 config.collectionname = "entities".to_string();
2139 }
2140 if config.query.is_empty() {
2141 config.query = "{}".to_string();
2142 }
2143 let envelope = config.to_envelope();
2144 let result = self.send(envelope).await;
2145 match result {
2146 Ok(m) => {
2147 let data = match m.data {
2148 Some(data) => data,
2149 None => {
2150 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2151 }
2152 };
2153 if m.command == "error" {
2154 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2155 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2156 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2157 }
2158 let response: CountResponse = prost::Message::decode(data.value.as_ref())
2159 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2160 Ok(response)
2161 }
2162 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2163 }
2164 }
2165 #[tracing::instrument(skip_all)]
2167 pub async fn distinct(
2168 &self,
2169 mut config: DistinctRequest,
2170 ) -> Result<DistinctResponse, OpenIAPError> {
2171 if config.collectionname.is_empty() {
2172 config.collectionname = "entities".to_string();
2173 }
2174 if config.query.is_empty() {
2175 config.query = "{}".to_string();
2176 }
2177 if config.field.is_empty() {
2178 return Err(OpenIAPError::ClientError("No field provided".to_string()));
2179 }
2180 let envelope = config.to_envelope();
2181 let result = self.send(envelope).await;
2182 match result {
2183 Ok(m) => {
2184 let data = match m.data {
2185 Some(data) => data,
2186 None => {
2187 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2188 }
2189 };
2190 if m.command == "error" {
2191 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2192 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2193 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2194 }
2195 let response: DistinctResponse = prost::Message::decode(data.value.as_ref())
2196 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2197 Ok(response)
2198 }
2199 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2200 }
2201 }
2202 #[tracing::instrument(skip_all)]
2204 pub async fn insert_one(
2205 &self,
2206 config: InsertOneRequest,
2207 ) -> Result<InsertOneResponse, OpenIAPError> {
2208 let envelope = config.to_envelope();
2209 let result = self.send(envelope).await;
2210 match result {
2211 Ok(m) => {
2212 let data = match m.data {
2213 Some(data) => data,
2214 None => {
2215 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2216 }
2217 };
2218 if m.command == "error" {
2219 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2220 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2221 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2222 }
2223 let response: InsertOneResponse = prost::Message::decode(data.value.as_ref())
2224 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2225 Ok(response)
2226 }
2227 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2228 }
2229 }
2230 #[tracing::instrument(skip_all)]
2232 pub async fn insert_many(
2233 &self,
2234 config: InsertManyRequest,
2235 ) -> Result<InsertManyResponse, OpenIAPError> {
2236 let envelope = config.to_envelope();
2237 let result = self.send(envelope).await;
2238 match result {
2239 Ok(m) => {
2240 let data = match m.data {
2241 Some(data) => data,
2242 None => {
2243 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2244 }
2245 };
2246 if m.command == "error" {
2247 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2248 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2249 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2250 }
2251 let response: InsertManyResponse = prost::Message::decode(data.value.as_ref())
2252 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2253 Ok(response)
2254 }
2255 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2256 }
2257 }
2258 #[tracing::instrument(skip_all)]
2260 pub async fn update_one(
2261 &self,
2262 config: UpdateOneRequest,
2263 ) -> Result<UpdateOneResponse, OpenIAPError> {
2264 let envelope = config.to_envelope();
2265 let result = self.send(envelope).await;
2266 match result {
2267 Ok(m) => {
2268 let data = match m.data {
2269 Some(data) => data,
2270 None => {
2271 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2272 }
2273 };
2274 if m.command == "error" {
2275 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2276 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2277 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2278 }
2279 let response: UpdateOneResponse = prost::Message::decode(data.value.as_ref())
2280 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2281 Ok(response)
2282 }
2283 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2284 }
2285 }
2286 #[tracing::instrument(skip_all)]
2288 pub async fn insert_or_update_one(
2289 &self,
2290 config: InsertOrUpdateOneRequest,
2291 ) -> Result<String, OpenIAPError> {
2292 let envelope = config.to_envelope();
2293 let result = self.send(envelope).await;
2294 match result {
2295 Ok(m) => {
2296 let data = match m.data {
2297 Some(data) => data,
2298 None => {
2299 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2300 }
2301 };
2302 if m.command == "error" {
2303 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2304 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2305 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2306 }
2307 let response: InsertOrUpdateOneResponse =
2308 prost::Message::decode(data.value.as_ref())
2309 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2310 Ok(response.result)
2311 }
2312 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2313 }
2314 }
2315 #[tracing::instrument(skip_all)]
2317 pub async fn insert_or_update_many(
2318 &self,
2319 config: InsertOrUpdateManyRequest,
2320 ) -> Result<InsertOrUpdateManyResponse, OpenIAPError> {
2321 let envelope = config.to_envelope();
2322 let result = self.send(envelope).await;
2323 match result {
2324 Ok(m) => {
2325 let data = match m.data {
2326 Some(data) => data,
2327 None => {
2328 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2329 }
2330 };
2331 if m.command == "error" {
2332 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2333 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2334 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2335 }
2336 let response: InsertOrUpdateManyResponse =
2337 prost::Message::decode(data.value.as_ref())
2338 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2339 Ok(response)
2340 }
2341 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2342 }
2343 }
2344 #[tracing::instrument(skip_all)]
2346 pub async fn update_document(
2347 &self,
2348 config: UpdateDocumentRequest,
2349 ) -> Result<UpdateDocumentResponse, OpenIAPError> {
2350 let envelope = config.to_envelope();
2351 let result = self.send(envelope).await;
2352 match result {
2353 Ok(m) => {
2354 let data = match m.data {
2355 Some(data) => data,
2356 None => {
2357 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2358 }
2359 };
2360 if m.command == "error" {
2361 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2362 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2363 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2364 }
2365 let response: UpdateDocumentResponse = prost::Message::decode(data.value.as_ref())
2366 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2367 Ok(response)
2368 }
2369 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2370 }
2371 }
2372 #[tracing::instrument(skip_all)]
2374 pub async fn delete_one(&self, config: DeleteOneRequest) -> Result<i32, OpenIAPError> {
2375 let envelope = config.to_envelope();
2376 let result = self.send(envelope).await;
2377 match result {
2378 Ok(m) => {
2379 let data = match m.data {
2380 Some(data) => data,
2381 None => {
2382 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2383 }
2384 };
2385 if m.command == "error" {
2386 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2387 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2388 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2389 }
2390 let response: DeleteOneResponse = prost::Message::decode(data.value.as_ref())
2391 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2392 Ok(response.affectedrows)
2393 }
2394 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2395 }
2396 }
2397 #[tracing::instrument(skip_all)]
2399 pub async fn delete_many(&self, config: DeleteManyRequest) -> Result<i32, OpenIAPError> {
2400 let envelope = config.to_envelope();
2401 let result = self.send(envelope).await;
2402 match result {
2403 Ok(m) => {
2404 let data = match m.data {
2405 Some(data) => data,
2406 None => {
2407 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2408 }
2409 };
2410 if m.command == "error" {
2411 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2412 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2413 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2414 }
2415 let response: DeleteManyResponse = prost::Message::decode(data.value.as_ref())
2416 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2417 Ok(response.affectedrows)
2418 }
2419 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2420 }
2421 }
2422 #[tracing::instrument(skip_all)]
2424 pub async fn download(
2425 &self,
2426 config: DownloadRequest,
2427 folder: Option<&str>,
2428 filename: Option<&str>,
2429 ) -> Result<DownloadResponse, OpenIAPError> {
2430 let envelope = config.to_envelope();
2431 match self.sendwithstream(envelope).await {
2432 Ok((response_rx, mut stream_rx)) => {
2433 let temp_file_path = util::generate_unique_filename("openiap");
2434 debug!("Temp file: {:?}", temp_file_path);
2435 let mut temp_file = File::create(&temp_file_path).map_err(|e| {
2436 OpenIAPError::ClientError(format!("Failed to create temp file: {}", e))
2437 })?;
2438 while !stream_rx.is_closed() {
2439 match stream_rx.recv().await {
2440 Some(received) => {
2441 if received.is_empty() {
2442 debug!("Stream closed");
2443 break;
2444 }
2445 debug!("Received {} bytes", received.len());
2446 temp_file.write_all(&received).map_err(|e| {
2447 OpenIAPError::ClientError(format!(
2448 "Failed to write to temp file: {}",
2449 e
2450 ))
2451 })?;
2452 }
2453 None => {
2454 debug!("Stream closed");
2455 break;
2456 }
2457 }
2458 }
2459 temp_file.sync_all().map_err(|e| {
2460 OpenIAPError::ClientError(format!("Failed to sync temp file: {}", e))
2461 })?;
2462
2463 let response = response_rx.await.map_err(|_| {
2464 OpenIAPError::ClientError("Failed to receive response".to_string())
2465 })?;
2466
2467 if response.command == "error" {
2468 let data = match response.data {
2469 Some(data) => data,
2470 None => {
2471 return Err(OpenIAPError::ClientError(
2472 "No data returned for SERVER error".to_string(),
2473 ));
2474 }
2475 };
2476 let e: ErrorResponse = prost::Message::decode(data.value.as_ref()).unwrap();
2477 return Err(OpenIAPError::ServerError(e.message));
2478 }
2479 let mut downloadresponse: DownloadResponse =
2480 prost::Message::decode(response.data.unwrap().value.as_ref()).unwrap();
2481
2482 let mut final_filename = match &filename {
2483 Some(f) => f,
2484 None => downloadresponse.filename.as_str(),
2485 };
2486 if final_filename.is_empty() {
2487 final_filename = downloadresponse.filename.as_str();
2488 }
2489 let mut folder = match &folder {
2490 Some(f) => f,
2491 None => ".",
2492 };
2493 if folder.is_empty() {
2494 folder = ".";
2495 }
2496 let filepath = format!("{}/{}", folder, final_filename);
2497 trace!("Moving file to {}", filepath);
2498 util::move_file(temp_file_path.to_str().unwrap(), filepath.as_str()).map_err(|e| {
2499 OpenIAPError::ClientError(format!("Failed to move file: {}", e))
2500 })?;
2501 debug!("Downloaded file to {}", filepath);
2502 downloadresponse.filename = filepath;
2503
2504 Ok(downloadresponse)
2505 }
2506 Err(status) => Err(OpenIAPError::ClientError(status.to_string())),
2507 }
2508 }
2509 #[tracing::instrument(skip_all)]
2511 pub async fn upload(
2512 &self,
2513 config: UploadRequest,
2514 filepath: &str,
2515 ) -> Result<UploadResponse, OpenIAPError> {
2516 debug!("upload: Uploading file: {}", filepath);
2575 let mut file = File::open(filepath)
2576 .map_err(|e| OpenIAPError::ClientError(format!("Failed to open file: {}", e)))?;
2577 let chunk_size = 1024 * 1024;
2578 let mut buffer = vec![0; chunk_size];
2579
2580 let envelope = config.to_envelope();
2582 let (response_rx, rid) = self.send_noawait(envelope).await?;
2583
2584 let envelope = BeginStream::from_rid(rid.clone());
2586 debug!("Sending beginstream to #{}", rid);
2587 if let Err(e) = self.send_envelope(envelope).await {
2588 let inner = self.inner.lock().await;
2589 inner.queries.lock().await.remove(&rid);
2590 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2591 }
2592
2593 let mut counter = 0;
2595 loop {
2596 let bytes_read = file.read(&mut buffer).map_err(|e| {
2597 OpenIAPError::ClientError(format!("Failed to read from file: {}", e))
2598 })?;
2599 counter += 1;
2600
2601 if bytes_read == 0 {
2602 break;
2603 }
2604
2605 let chunk = buffer[..bytes_read].to_vec();
2606 let envelope = Stream::from_rid(chunk, rid.clone());
2607 debug!("Sending chunk {} stream to #{}", counter, envelope.rid);
2608 if let Err(e) = self.send_envelope(envelope).await {
2609 let inner = self.inner.lock().await;
2610 inner.queries.lock().await.remove(&rid);
2611 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2612 }
2613 }
2614
2615 let envelope = EndStream::from_rid(rid.clone());
2617 debug!("Sending endstream to #{}", rid);
2618 if let Err(e) = self.send_envelope(envelope).await {
2619 let inner = self.inner.lock().await;
2620 inner.queries.lock().await.remove(&rid);
2621 return Err(OpenIAPError::ClientError(format!("Failed to send data: {}", e)));
2622 }
2623
2624 debug!("Wait for upload response for #{}", rid);
2626 let result = response_rx.await;
2627 let inner = self.inner.lock().await;
2628 inner.queries.lock().await.remove(&rid);
2629
2630 match result {
2631 Ok(response) => {
2632 if response.command == "error" {
2633 let error_response: ErrorResponse = prost::Message::decode(
2634 response.data.unwrap().value.as_ref(),
2635 )
2636 .map_err(|e| {
2637 OpenIAPError::ClientError(format!("Failed to decode ErrorResponse: {}", e))
2638 })?;
2639 return Err(OpenIAPError::ServerError(error_response.message));
2640 }
2641 let upload_response: UploadResponse =
2642 prost::Message::decode(response.data.unwrap().value.as_ref()).map_err(|e| {
2643 OpenIAPError::ClientError(format!("Failed to decode UploadResponse: {}", e))
2644 })?;
2645 Ok(upload_response)
2646 }
2647 Err(e) => Err(OpenIAPError::CustomError(e.to_string())),
2648 }
2649 }
2650 #[tracing::instrument(skip_all)]
2652 pub async fn watch(
2653 &self,
2654 mut config: WatchRequest,
2655 callback: Box<dyn Fn(WatchEvent) + Send + Sync>,
2656 ) -> Result<String, OpenIAPError> {
2657 if config.collectionname.is_empty() {
2658 config.collectionname = "entities".to_string();
2659 }
2660 if config.paths.is_empty() {
2661 config.paths = vec!["".to_string()];
2662 }
2663
2664 let envelope = config.to_envelope();
2665 let result = self.send(envelope).await;
2666 match result {
2667 Ok(m) => {
2668 let data = match m.data {
2669 Some(data) => data,
2670 None => {
2671 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2672 }
2673 };
2674 if m.command == "error" {
2675 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2676 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2677 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2678 }
2679 let response: WatchResponse = prost::Message::decode(data.value.as_ref())
2680 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2681
2682 let inner = self.inner.lock().await;
2683 inner
2684 .watches
2685 .lock()
2686 .await
2687 .insert(response.id.clone(), callback);
2688
2689 Ok(response.id)
2690 }
2691 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2692 }
2693 }
2694 #[tracing::instrument(skip_all)]
2696 pub async fn unwatch(&self, id: &str) -> Result<(), OpenIAPError> {
2697 let config = UnWatchRequest::byid(id);
2698 let envelope = config.to_envelope();
2699 let result = self.send(envelope).await;
2700 match result {
2701 Ok(m) => {
2702 let data = match m.data {
2703 Some(data) => data,
2704 None => {
2705 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2706 }
2707 };
2708 if m.command == "error" {
2709 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2710 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2711 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2712 }
2713 Ok(())
2714 }
2715 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2716 }
2717 }
2718 #[tracing::instrument(skip_all)]
2720 pub async fn register_queue(
2721 &self,
2722 mut config: RegisterQueueRequest,
2723 callback: Box<dyn Fn(QueueEvent) + Send + Sync>,
2724 ) -> Result<String, OpenIAPError> {
2725 if config.queuename.is_empty() {
2726 config.queuename = "".to_string();
2727 }
2728
2729 let envelope = config.to_envelope();
2730 let result = self.send(envelope).await;
2731 match result {
2732 Ok(m) => {
2733 let data = match m.data {
2734 Some(data) => data,
2735 None => {
2736 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2737 }
2738 };
2739 if m.command == "error" {
2740 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2741 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2742 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2743 }
2744 let response: RegisterQueueResponse =
2745 prost::Message::decode(data.value.as_ref())
2746 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2747
2748 let inner = self.inner.lock().await;
2749 inner
2750 .queues
2751 .lock()
2752 .await
2753 .insert(response.queuename.clone(), callback);
2754
2755 Ok(response.queuename)
2756 }
2757 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2758 }
2759 }
2760 #[tracing::instrument(skip_all)]
2762 pub async fn unregister_queue(&self, queuename: &str) -> Result<(), OpenIAPError> {
2763 let config = UnRegisterQueueRequest::byqueuename(queuename);
2764 let envelope = config.to_envelope();
2765 let result = self.send(envelope).await;
2766 match result {
2767 Ok(m) => {
2768 let data = match m.data {
2769 Some(data) => data,
2770 None => {
2771 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2772 }
2773 };
2774 if m.command == "error" {
2775 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2776 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2777 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2778 }
2779 Ok(())
2780 }
2781 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2782 }
2783 }
2784 #[tracing::instrument(skip_all)]
2786 pub async fn register_exchange(
2787 &self,
2788 mut config: RegisterExchangeRequest,
2789 callback: Box<dyn Fn(QueueEvent) + Send + Sync>,
2790 ) -> Result<String, OpenIAPError> {
2791 if config.exchangename.is_empty() {
2792 return Err(OpenIAPError::ClientError(
2793 "No exchange name provided".to_string(),
2794 ));
2795 }
2796 if config.algorithm.is_empty() {
2797 config.algorithm = "fanout".to_string();
2798 }
2799 let envelope = config.to_envelope();
2800 let result = self.send(envelope).await;
2801 match result {
2802 Ok(m) => {
2803 let data = match m.data {
2804 Some(data) => data,
2805 None => {
2806 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2807 }
2808 };
2809 if m.command == "error" {
2810 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2811 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2812 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2813 }
2814 let response: RegisterExchangeResponse =
2815 prost::Message::decode(data.value.as_ref())
2816 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2817 if !response.queuename.is_empty() {
2818 let inner = self.inner.lock().await;
2819 inner
2820 .queues
2821 .lock()
2822 .await
2823 .insert(response.queuename.clone(), callback);
2824 }
2825 Ok(response.queuename)
2826 }
2827 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2828 }
2829 }
2830 #[tracing::instrument(skip_all)]
2832 pub async fn queue_message(
2833 &self,
2834 config: QueueMessageRequest,
2835 ) -> Result<QueueMessageResponse, OpenIAPError> {
2836 if config.queuename.is_empty() && config.exchangename.is_empty() {
2837 return Err(OpenIAPError::ClientError(
2838 "No queue or exchange name provided".to_string(),
2839 ));
2840 }
2841 let envelope = config.to_envelope();
2842 let result = self.send(envelope).await;
2843 match result {
2844 Ok(m) => {
2845 let data = match m.data {
2846 Some(d) => d,
2847 None => {
2848 return Err(OpenIAPError::ClientError("No data in response".to_string()))
2849 }
2850 };
2851 if m.command == "error" {
2852 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2853 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2854 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2855 }
2856 let response: QueueMessageResponse = prost::Message::decode(data.value.as_ref())
2857 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2858 Ok(response)
2859 }
2860 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2861 }
2862 }
2863 #[tracing::instrument(skip_all)]
2865 pub async fn rpc(&self, mut config: QueueMessageRequest) -> Result<String, OpenIAPError> {
2866 if config.queuename.is_empty() && config.exchangename.is_empty() {
2867 return Err(OpenIAPError::ClientError(
2868 "No queue or exchange name provided".to_string(),
2869 ));
2870 }
2871
2872 let (tx, rx) = oneshot::channel::<String>();
2873 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
2874
2875 let q = self
2876 .register_queue(
2877 RegisterQueueRequest {
2878 queuename: "".to_string(),
2879 },
2880 Box::new(move |event| {
2881 let tx = tx.lock().unwrap().take().unwrap();
2882 tx.send(event.data).unwrap();
2883 }),
2884 )
2885 .await
2886 .unwrap();
2887
2888 config.replyto = q.clone();
2889 let envelope = config.to_envelope();
2890
2891 let result = self.send(envelope).await;
2892 match result {
2893 Ok(m) => {
2894 let data = match m.data {
2895 Some(d) => d,
2896 None => {
2897 return Err(OpenIAPError::ClientError("No data in response".to_string()))
2898 }
2899 };
2900 if m.command == "error" {
2901 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
2902 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
2903 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
2904 }
2905 let response = rx.await.unwrap();
2909
2910 let ur_response = self.unregister_queue(&q).await;
2911 match ur_response {
2912 Ok(_) => {
2913 debug!("Unregistered Response Queue: {:?}", q);
2914 }
2915 Err(e) => {
2916 error!("Failed to unregister Response Queue: {:?}", e);
2917 }
2918 }
2919
2920 Ok(response)
2921 }
2922 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
2923 }
2924 }
2925 #[tracing::instrument(skip_all)]
2929 pub async fn push_workitem(
2930 &self,
2931 mut config: PushWorkitemRequest,
2932 ) -> Result<PushWorkitemResponse, OpenIAPError> {
2933 if config.wiq.is_empty() && config.wiqid.is_empty() {
2934 return Err(OpenIAPError::ClientError(
2935 "No queue name or id provided".to_string(),
2936 ));
2937 }
2938 for f in &mut config.files {
2939 if f.filename.is_empty() && f.file.is_empty() {
2940 debug!("Filename is empty");
2941 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
2942 if !std::path::Path::new(&f.filename).exists() {
2944 debug!("File does not exist: {}", f.filename);
2945 } else {
2946 let filesize = std::fs::metadata(&f.filename).unwrap().len();
2947 if filesize < 5 * 1024 * 1024 {
2949 debug!("File {} exists so ATTACHING it.", f.filename);
2950 let filename = std::path::Path::new(&f.filename)
2951 .file_name()
2952 .unwrap()
2953 .to_str()
2954 .unwrap();
2955 f.file = std::fs::read(&f.filename).unwrap();
2956 f.file = util::compress_file_to_vec(&f.filename).unwrap();
2959 f.compressed = true;
2960 f.filename = filename.to_string();
2961 f.id = "findme".to_string();
2962 trace!(
2963 "File {} was read and assigned to f.file, size: {}",
2964 f.filename,
2965 f.file.len()
2966 );
2967 } else {
2968 debug!("File {} exists so UPLOADING it.", f.filename);
2969 let filename = std::path::Path::new(&f.filename)
2970 .file_name()
2971 .unwrap()
2972 .to_str()
2973 .unwrap();
2974 let uploadconfig = UploadRequest {
2975 filename: filename.to_string(),
2976 collectionname: "fs.files".to_string(),
2977 ..Default::default()
2978 };
2979 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
2980 trace!("File {} was upload as {}", filename, uploadresult.id);
2981 f.id = uploadresult.id.clone();
2983 f.filename = filename.to_string();
2984 }
2985 }
2986 } else {
2987 debug!("File {} is already uploaded", f.filename);
2988 }
2989 }
2990 let envelope = config.to_envelope();
2991 let result = self.send(envelope).await;
2992 match result {
2993 Ok(m) => {
2994 let data = match m.data {
2995 Some(data) => data,
2996 None => {
2997 return Err(OpenIAPError::ClientError("No data returned".to_string()));
2998 }
2999 };
3000 if m.command == "error" {
3001 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3002 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3003 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3004 }
3005 let response: PushWorkitemResponse = prost::Message::decode(data.value.as_ref())
3006 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3007 Ok(response)
3008 }
3009 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3010 }
3011 }
3012 #[tracing::instrument(skip_all)]
3016 pub async fn push_workitems(
3017 &self,
3018 mut config: PushWorkitemsRequest,
3019 ) -> Result<PushWorkitemsResponse, OpenIAPError> {
3020 if config.wiq.is_empty() && config.wiqid.is_empty() {
3021 return Err(OpenIAPError::ClientError(
3022 "No queue name or id provided".to_string(),
3023 ));
3024 }
3025 for wi in &mut config.items {
3026 for f in &mut wi.files {
3027 if f.filename.is_empty() && f.file.is_empty() {
3028 debug!("Filename is empty");
3029 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3030 if !std::path::Path::new(&f.filename).exists() {
3032 debug!("File does not exist: {}", f.filename);
3033 } else {
3034 let filesize = std::fs::metadata(&f.filename).unwrap().len();
3035 if filesize < 5 * 1024 * 1024 {
3037 debug!("File {} exists so ATTACHING it.", f.filename);
3038 let filename = std::path::Path::new(&f.filename)
3039 .file_name()
3040 .unwrap()
3041 .to_str()
3042 .unwrap();
3043 f.file = std::fs::read(&f.filename).unwrap();
3044 f.file = util::compress_file_to_vec(&f.filename).unwrap();
3047 f.compressed = true;
3048 f.filename = filename.to_string();
3049 f.id = "findme".to_string();
3050 trace!(
3051 "File {} was read and assigned to f.file, size: {}",
3052 f.filename,
3053 f.file.len()
3054 );
3055 } else {
3056 debug!("File {} exists so UPLOADING it.", f.filename);
3057 let filename = std::path::Path::new(&f.filename)
3058 .file_name()
3059 .unwrap()
3060 .to_str()
3061 .unwrap();
3062 let uploadconfig = UploadRequest {
3063 filename: filename.to_string(),
3064 collectionname: "fs.files".to_string(),
3065 ..Default::default()
3066 };
3067 let uploadresult =
3068 self.upload(uploadconfig, &f.filename).await.unwrap();
3069 trace!("File {} was upload as {}", filename, uploadresult.id);
3070 f.id = uploadresult.id.clone();
3072 f.filename = filename.to_string();
3073 }
3074 }
3075 } else {
3076 debug!("File {} is already uploaded", f.filename);
3077 }
3078 }
3079 }
3080 let envelope = config.to_envelope();
3081 let result = self.send(envelope).await;
3082 match result {
3083 Ok(m) => {
3084 let data = match m.data {
3085 Some(data) => data,
3086 None => {
3087 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3088 }
3089 };
3090 if m.command == "error" {
3091 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3092 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3093 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3094 }
3095 let response: PushWorkitemsResponse =
3096 prost::Message::decode(data.value.as_ref())
3097 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3098 Ok(response)
3099 }
3100 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3101 }
3102 }
3103 #[tracing::instrument(skip_all)]
3106 pub async fn pop_workitem(
3107 &self,
3108 config: PopWorkitemRequest,
3109 downloadfolder: Option<&str>,
3110 ) -> Result<PopWorkitemResponse, OpenIAPError> {
3111 if config.wiq.is_empty() && config.wiqid.is_empty() {
3112 return Err(OpenIAPError::ClientError(
3113 "No queue name or id provided".to_string(),
3114 ));
3115 }
3116 let envelope = config.to_envelope();
3117 let result = self.send(envelope).await;
3118 match result {
3119 Ok(m) => {
3120 let data = match m.data {
3121 Some(data) => data,
3122 None => {
3123 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3124 }
3125 };
3126 if m.command == "error" {
3127 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3128 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3129 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3130 }
3131 let response: PopWorkitemResponse = prost::Message::decode(data.value.as_ref())
3132 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3133
3134 match &response.workitem {
3135 Some(wi) => {
3136 for f in &wi.files {
3137 if !f.id.is_empty() {
3138 let downloadconfig = DownloadRequest {
3139 id: f.id.clone(),
3140 collectionname: "fs.files".to_string(),
3141 ..Default::default()
3142 };
3143 let downloadresult =
3144 match self.download(downloadconfig, downloadfolder, None).await
3145 {
3146 Ok(r) => r,
3147 Err(e) => {
3148 debug!("Failed to download file: {}", e);
3149 continue;
3150 }
3151 };
3152 debug!(
3153 "File {} was downloaded as {}",
3154 f.filename, downloadresult.filename
3155 );
3156 }
3157 }
3158 }
3159 None => {
3160 debug!("No workitem found");
3161 }
3162 }
3163 Ok(response)
3164 }
3165 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3166 }
3167 }
3168 #[tracing::instrument(skip_all)]
3174 pub async fn update_workitem(
3175 &self,
3176 mut config: UpdateWorkitemRequest,
3177 ) -> Result<UpdateWorkitemResponse, OpenIAPError> {
3178 match &config.workitem {
3179 Some(wiq) => {
3180 if wiq.id.is_empty() {
3181 return Err(OpenIAPError::ClientError(
3182 "No workitem id provided".to_string(),
3183 ));
3184 }
3185 }
3186 None => {
3187 return Err(OpenIAPError::ClientError(
3188 "No workitem provided".to_string(),
3189 ));
3190 }
3191 }
3192 for f in &mut config.files {
3193 if f.filename.is_empty() && f.file.is_empty() {
3194 debug!("Filename is empty");
3195 } else if !f.filename.is_empty() && f.file.is_empty() && f.id.is_empty() {
3196 if !std::path::Path::new(&f.filename).exists() {
3197 debug!("File does not exist: {}", f.filename);
3198 } else {
3199 debug!("File {} exists so uploading it.", f.filename);
3200 let filename = std::path::Path::new(&f.filename)
3201 .file_name()
3202 .unwrap()
3203 .to_str()
3204 .unwrap();
3205 let uploadconfig = UploadRequest {
3206 filename: filename.to_string(),
3207 collectionname: "fs.files".to_string(),
3208 ..Default::default()
3209 };
3210 let uploadresult = self.upload(uploadconfig, &f.filename).await.unwrap();
3211 trace!("File {} was upload as {}", filename, uploadresult.id);
3212 f.id = uploadresult.id.clone();
3213 f.filename = filename.to_string();
3214 }
3215 } else {
3216 debug!("Skipped file");
3217 }
3218 }
3219 let envelope = config.to_envelope();
3220 let result = self.send(envelope).await;
3221 match result {
3222 Ok(m) => {
3223 let data = match m.data {
3224 Some(d) => d,
3225 None => {
3226 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3227 }
3228 };
3229 if m.command == "error" {
3230 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3231 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3232 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3233 }
3234 let response: UpdateWorkitemResponse = prost::Message::decode(data.value.as_ref())
3235 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3236 Ok(response)
3237 }
3238 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3239 }
3240 }
3241 #[tracing::instrument(skip_all)]
3243 pub async fn delete_workitem(
3244 &self,
3245 config: DeleteWorkitemRequest,
3246 ) -> Result<DeleteWorkitemResponse, OpenIAPError> {
3247 if config.id.is_empty() {
3248 return Err(OpenIAPError::ClientError(
3249 "No workitem id provided".to_string(),
3250 ));
3251 }
3252 let envelope = config.to_envelope();
3253 let result = self.send(envelope).await;
3254 match result {
3255 Ok(m) => {
3256 let data = match m.data {
3257 Some(d) => d,
3258 None => {
3259 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3260 }
3261 };
3262 if m.command == "error" {
3263 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3264 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3265 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3266 }
3267 let response: DeleteWorkitemResponse = prost::Message::decode(data.value.as_ref())
3268 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3269 Ok(response)
3270 }
3271 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3272 }
3273 }
3274 #[tracing::instrument(skip_all)]
3276 pub async fn add_workitem_queue(
3277 &self,
3278 config: AddWorkItemQueueRequest,
3279 ) -> Result<WorkItemQueue, OpenIAPError> {
3280 if config.workitemqueue.is_none() {
3281 return Err(OpenIAPError::ClientError(
3282 "No workitem queue name provided".to_string(),
3283 ));
3284 }
3285 let envelope = config.to_envelope();
3286 let result = self.send(envelope).await;
3287 match result {
3288 Ok(m) => {
3289 let data = match m.data {
3290 Some(d) => d,
3291 None => {
3292 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3293 }
3294 };
3295 if m.command == "error" {
3296 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3297 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3298 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3299 }
3300 let response: AddWorkItemQueueResponse =
3301 prost::Message::decode(data.value.as_ref())
3302 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3303 match response.workitemqueue {
3304 Some(wiq) => Ok(wiq),
3305 None => {
3306 return Err(OpenIAPError::ClientError(
3307 "No workitem queue returned".to_string(),
3308 ));
3309 }
3310 }
3311 }
3312 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3313 }
3314 }
3315 #[tracing::instrument(skip_all)]
3317 pub async fn update_workitem_queue(
3318 &self,
3319 config: UpdateWorkItemQueueRequest,
3320 ) -> Result<WorkItemQueue, OpenIAPError> {
3321 if config.workitemqueue.is_none() {
3322 return Err(OpenIAPError::ClientError(
3323 "No workitem queue name provided".to_string(),
3324 ));
3325 }
3326 let envelope = config.to_envelope();
3327 let result = self.send(envelope).await;
3328 match result {
3329 Ok(m) => {
3330 let data = match m.data {
3331 Some(d) => d,
3332 None => {
3333 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3334 }
3335 };
3336 if m.command == "error" {
3337 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3338 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3339 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3340 }
3341 let response: UpdateWorkItemQueueResponse =
3342 prost::Message::decode(data.value.as_ref())
3343 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3344 match response.workitemqueue {
3345 Some(wiq) => Ok(wiq),
3346 None => {
3347 return Err(OpenIAPError::ClientError(
3348 "No workitem queue returned".to_string(),
3349 ));
3350 }
3351 }
3352 }
3353 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3354 }
3355 }
3356 #[tracing::instrument(skip_all)]
3358 pub async fn delete_workitem_queue(
3359 &self,
3360 config: DeleteWorkItemQueueRequest,
3361 ) -> Result<(), OpenIAPError> {
3362 if config.wiq.is_empty() && config.wiqid.is_empty() {
3363 return Err(OpenIAPError::ClientError(
3364 "No workitem queue name or id provided".to_string(),
3365 ));
3366 }
3367 let envelope = config.to_envelope();
3368 let result = self.send(envelope).await;
3369 match result {
3370 Ok(m) => {
3371 let data = match m.data {
3372 Some(d) => d,
3373 None => {
3374 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3375 }
3376 };
3377 if m.command == "error" {
3378 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3379 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3380 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3381 }
3382 Ok(())
3383 }
3384 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3385 }
3386 }
3387 #[tracing::instrument(skip_all)]
3389 pub async fn custom_command(
3390 &self,
3391 config: CustomCommandRequest,
3392 ) -> Result<String, OpenIAPError> {
3393 if config.command.is_empty() {
3394 return Err(OpenIAPError::ClientError("No command provided".to_string()));
3395 }
3396 let envelope = config.to_envelope();
3397 let result = self.send(envelope).await;
3398 match result {
3399 Ok(m) => {
3400 let data = match m.data {
3401 Some(d) => d,
3402 None => {
3403 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3404 }
3405 };
3406 if m.command == "error" {
3407 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3408 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3409 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3410 }
3411 let response: CustomCommandResponse =
3412 prost::Message::decode(data.value.as_ref())
3413 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3414 Ok(response.result)
3415 }
3416 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3417 }
3418 }
3419 #[tracing::instrument(skip_all)]
3421 pub async fn delete_package(&self, packageid: &str) -> Result<(), OpenIAPError> {
3422 let config = DeletePackageRequest::byid(packageid);
3423 let envelope = config.to_envelope();
3424 let result = self.send(envelope).await;
3425 match result {
3426 Ok(m) => {
3427 let data = match m.data {
3428 Some(data) => data,
3429 None => {
3430 return Err(OpenIAPError::ClientError("No data returned".to_string()));
3431 }
3432 };
3433 if m.command == "error" {
3434 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3435 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3436 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3437 }
3438 Ok(())
3441 }
3442 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3443 }
3444 }
3445 #[tracing::instrument(skip_all)]
3447 pub async fn start_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3448 let config = StartAgentRequest::byid(agentid);
3449 let envelope = config.to_envelope();
3450 let result = self.send(envelope).await;
3451 match result {
3452 Ok(m) => {
3453 let data = match m.data {
3454 Some(d) => d,
3455 None => {
3456 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3457 }
3458 };
3459 if m.command == "error" {
3460 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3461 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3462 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3463 }
3464 Ok(())
3467 }
3468 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3469 }
3470 }
3471 #[tracing::instrument(skip_all)]
3473 pub async fn stop_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3474 let config = StopAgentRequest::byid(agentid);
3475 let envelope = config.to_envelope();
3476 let result = self.send(envelope).await;
3477 match result {
3478 Ok(m) => {
3479 let data = match m.data {
3480 Some(d) => d,
3481 None => {
3482 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3483 }
3484 };
3485 if m.command == "error" {
3486 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3487 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3488 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3489 }
3490 Ok(())
3493 }
3494 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3495 }
3496 }
3497 #[tracing::instrument(skip_all)]
3499 pub async fn delete_agent_pod(&self, agentid: &str, podname: &str) -> Result<(), OpenIAPError> {
3500 let config = DeleteAgentPodRequest::byid(agentid, podname);
3501 let envelope = config.to_envelope();
3502 let result = self.send(envelope).await;
3503 match result {
3504 Ok(m) => {
3505 let data = match m.data {
3506 Some(d) => d,
3507 None => {
3508 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3509 }
3510 };
3511 if m.command == "error" {
3512 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3513 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3514 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3515 }
3516 Ok(())
3519 }
3520 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3521 }
3522 }
3523 #[tracing::instrument(skip_all)]
3525 pub async fn delete_agent(&self, agentid: &str) -> Result<(), OpenIAPError> {
3526 let config = DeleteAgentRequest::byid(agentid);
3527 let envelope = config.to_envelope();
3528 let result = self.send(envelope).await;
3529 match result {
3530 Ok(m) => {
3531 let data = match m.data {
3532 Some(d) => d,
3533 None => {
3534 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3535 }
3536 };
3537 if m.command == "error" {
3538 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3539 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3540 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3541 }
3542 Ok(())
3545 }
3546 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3547 }
3548 }
3549 #[tracing::instrument(skip_all)]
3551 pub async fn get_agent_pods(&self, agentid: &str, stats: bool) -> Result<String, OpenIAPError> {
3552 let config = GetAgentPodsRequest::byid(agentid, stats);
3553 let envelope = config.to_envelope();
3554 let result = self.send(envelope).await;
3555 match result {
3556 Ok(m) => {
3557 let data = match m.data {
3558 Some(d) => d,
3559 None => {
3560 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3561 }
3562 };
3563 if m.command == "error" {
3564 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3565 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3566 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3567 }
3568 let response: GetAgentPodsResponse = prost::Message::decode(data.value.as_ref())
3569 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3570 Ok(response.results)
3571 }
3572 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3573 }
3574 }
3575 #[tracing::instrument(skip_all)]
3577 pub async fn get_agent_pod_logs(
3578 &self,
3579 agentid: &str,
3580 podname: &str,
3581 ) -> Result<String, OpenIAPError> {
3582 let config = GetAgentLogRequest::new(agentid, podname);
3583 let envelope = config.to_envelope();
3584 let result = self.send(envelope).await;
3585 match result {
3586 Ok(m) => {
3587 let data = match m.data {
3588 Some(d) => d,
3589 None => {
3590 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3591 }
3592 };
3593 if m.command == "error" {
3594 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3595 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3596 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3597 }
3598 let response: GetAgentLogResponse = prost::Message::decode(data.value.as_ref())
3599 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3600 Ok(response.result)
3601 }
3602 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3603 }
3604 }
3605
3606 #[tracing::instrument(skip_all)]
3609 pub async fn ensure_customer(
3610 &self,
3611 config: EnsureCustomerRequest,
3612 ) -> Result<EnsureCustomerResponse, OpenIAPError> {
3613 if config.customer.is_none() && config.stripe.is_none() {
3614 return Err(OpenIAPError::ClientError(
3615 "No customer or stripe provided".to_string(),
3616 ));
3617 }
3618 let envelope = config.to_envelope();
3619 let result = self.send(envelope).await;
3620 match result {
3621 Ok(m) => {
3622 let data = match m.data {
3623 Some(d) => d,
3624 None => {
3625 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3626 }
3627 };
3628 if m.command == "error" {
3629 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3630 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3631 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3632 }
3633 let response: EnsureCustomerResponse = prost::Message::decode(data.value.as_ref())
3634 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3635 Ok(response)
3636 }
3637 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3638 }
3639 }
3640 #[tracing::instrument(skip_all)]
3642 pub async fn create_workflow_instance(
3643 &self,
3644 config: CreateWorkflowInstanceRequest,
3645 ) -> Result<String, OpenIAPError> {
3646 if config.workflowid.is_empty() {
3647 return Err(OpenIAPError::ClientError(
3648 "No workflow id provided".to_string(),
3649 ));
3650 }
3651 let envelope = config.to_envelope();
3652 let result = self.send(envelope).await;
3653 match result {
3654 Ok(m) => {
3655 let data = match m.data {
3656 Some(d) => d,
3657 None => {
3658 return Err(OpenIAPError::ClientError("No data in response".to_string()));
3659 }
3660 };
3661 if m.command == "error" {
3662 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3663 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3664 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3665 }
3666 let response: CreateWorkflowInstanceResponse =
3667 prost::Message::decode(data.value.as_ref())
3668 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3669 Ok(response.instanceid)
3670 }
3671 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3672 }
3673 }
3674
3675 #[tracing::instrument(skip_all)]
3677 pub async fn invoke_openrpa(
3678 &self,
3679 config: InvokeOpenRpaRequest,
3680 ) -> Result<String, OpenIAPError> {
3681 if config.robotid.is_empty() {
3682 return Err(OpenIAPError::ClientError(
3683 "No robot id provided".to_string(),
3684 ));
3685 }
3686 if config.workflowid.is_empty() {
3687 return Err(OpenIAPError::ClientError(
3688 "No workflow id provided".to_string(),
3689 ));
3690 }
3691
3692 let (tx, rx) = oneshot::channel::<String>();
3693 let tx = Arc::new(std::sync::Mutex::new(Some(tx)));
3694
3695 let q = self
3696 .register_queue(
3697 RegisterQueueRequest {
3698 queuename: "".to_string(),
3699 },
3700 Box::new(move |event| {
3701 let json = event.data.clone();
3702 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3703 let command: String = obj["command"].as_str().unwrap().to_string();
3704 debug!("Received event: {:?}", event);
3705 if command.eq("invokesuccess") {
3706 debug!("Robot successfully started running workflow");
3707 } else if command.eq("invokeidle") {
3708 debug!("Workflow went idle");
3709 } else if command.eq("invokeerror") {
3710 debug!("Robot failed to run workflow");
3711 let tx = tx.lock().unwrap().take().unwrap();
3712 tx.send(event.data).unwrap();
3713 } else if command.eq("timeout") {
3714 debug!("No robot picked up the workflow");
3715 let tx = tx.lock().unwrap().take().unwrap();
3716 tx.send(event.data).unwrap();
3717 } else if command.eq("invokecompleted") {
3718 debug!("Robot completed running workflow");
3719 let tx = tx.lock().unwrap().take().unwrap();
3720 tx.send(event.data).unwrap();
3721 } else {
3722 let tx = tx.lock().unwrap().take().unwrap();
3723 tx.send(event.data).unwrap();
3724 }
3725 }),
3726 )
3727 .await
3728 .unwrap();
3729 debug!("Registered Response Queue: {:?}", q);
3730 let data = format!(
3731 "{{\"command\":\"invoke\",\"workflowid\":\"{}\",\"payload\": {}}}",
3732 config.workflowid, config.payload
3733 );
3734 debug!("Send Data: {}", data);
3735 debug!("To Queue: {} With reply to: {}", config.robotid, q);
3736 let config = QueueMessageRequest {
3737 queuename: config.robotid.clone(),
3738 replyto: q.clone(),
3739 data,
3740 ..Default::default()
3741 };
3742
3743 let envelope = config.to_envelope();
3744
3745 let result = self.send(envelope).await;
3746 match result {
3747 Ok(m) => {
3748 let data = match m.data {
3749 Some(d) => d,
3750 None => {
3751 return Err(OpenIAPError::ClientError("No data in response".to_string()))
3752 }
3753 };
3754 if m.command == "error" {
3755 let e: ErrorResponse = prost::Message::decode(data.value.as_ref())
3756 .map_err(|e| OpenIAPError::CustomError(e.to_string()))?;
3757 return Err(OpenIAPError::ServerError(format!("{:?}", e.message)));
3758 }
3759 let json = rx.await.unwrap();
3763 debug!("Received json result: {:?}", json);
3764 let obj = serde_json::from_str::<serde_json::Value>(&json).unwrap();
3765 let command: String = obj["command"].as_str().unwrap().to_string();
3766 let mut data = "".to_string();
3767 if obj["data"].as_str().is_some() {
3768 data = obj["data"].as_str().unwrap().to_string();
3769 } else if obj["data"].as_object().is_some() {
3770 data = obj["data"].to_string();
3771 }
3772 if !command.eq("invokecompleted") {
3773 if command.eq("timeout") {
3774 return Err(OpenIAPError::ServerError("Timeout".to_string()));
3775 } else {
3776 if data.is_empty() {
3777 return Err(OpenIAPError::ServerError(
3778 "Error with no message".to_string(),
3779 ));
3780 }
3781 return Err(OpenIAPError::ServerError(data));
3782 }
3783 }
3784 let response = self.unregister_queue(&q).await;
3785 match response {
3786 Ok(_) => {
3787 debug!("Unregistered Response Queue: {:?}", q);
3788 }
3789 Err(e) => {
3790 error!("Failed to unregister Response Queue: {:?}", e);
3791 }
3792 }
3793 Ok(data)
3794 }
3795 Err(e) => Err(OpenIAPError::ClientError(e.to_string())),
3796 }
3797 }
3798}
3799