1use crate::dapr::proto::common::v1::job_failure_policy::Policy;
2use crate::dapr::proto::common::v1::JobFailurePolicyConstant;
3use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
4use crate::error::Error;
5use async_trait::async_trait;
6use futures::StreamExt;
7use prost_types::Any;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashMap;
11use std::time::Duration;
12use tokio::io::AsyncRead;
13use tonic::codegen::tokio_stream;
14use tonic::{transport::Channel as TonicChannel, Request};
15use tonic::{Status, Streaming};
16
17#[derive(Clone)]
18pub struct Client<T>(T);
19
20impl<T: DaprInterface> Client<T> {
21 pub async fn connect(addr: String) -> Result<Self, Error> {
27 let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
29 let address = format!("{addr}:{port}");
30
31 Ok(Client(T::connect(address).await?))
32 }
33
34 pub async fn connect_with_port(addr: String, port: String) -> Result<Self, Error> {
41 let port: u16 = match port.parse::<u16>() {
43 Ok(p) => p,
44 Err(_) => {
45 panic!("Port must be a number between 1 and 65535");
46 }
47 };
48
49 let address = format!("{addr}:{port}");
50
51 Ok(Client(T::connect(address).await?))
52 }
53
54 pub async fn invoke_service<I, M>(
62 &mut self,
63 app_id: I,
64 method_name: M,
65 data: Option<Any>,
66 ) -> Result<InvokeServiceResponse, Error>
67 where
68 I: Into<String>,
69 M: Into<String>,
70 {
71 self.0
72 .invoke_service(InvokeServiceRequest {
73 id: app_id.into(),
74 message: common_v1::InvokeRequest {
75 method: method_name.into(),
76 data,
77 ..Default::default()
78 }
79 .into(),
80 })
81 .await
82 }
83
84 pub async fn invoke_binding<S>(
91 &mut self,
92 name: S,
93 data: Vec<u8>,
94 operation: S,
95 metadata: Option<HashMap<String, String>>,
96 ) -> Result<InvokeBindingResponse, Error>
97 where
98 S: Into<String>,
99 {
100 let mut mdata = HashMap::<String, String>::new();
101 if let Some(m) = metadata {
102 mdata = m;
103 }
104
105 self.0
106 .invoke_binding(InvokeBindingRequest {
107 name: name.into(),
108 data,
109 operation: operation.into(),
110 metadata: mdata,
111 })
112 .await
113 }
114
115 pub async fn publish_event<S>(
125 &mut self,
126 pubsub_name: S,
127 topic: S,
128 data_content_type: S,
129 data: Vec<u8>,
130 metadata: Option<HashMap<String, String>>,
131 ) -> Result<(), Error>
132 where
133 S: Into<String>,
134 {
135 let mut mdata = HashMap::<String, String>::new();
136 if let Some(m) = metadata {
137 mdata = m;
138 }
139 self.0
140 .publish_event(PublishEventRequest {
141 pubsub_name: pubsub_name.into(),
142 topic: topic.into(),
143 data_content_type: data_content_type.into(),
144 data,
145 metadata: mdata,
146 })
147 .await
148 }
149
150 pub async fn get_secret<S>(&mut self, store_name: S, key: S) -> Result<GetSecretResponse, Error>
157 where
158 S: Into<String>,
159 {
160 self.0
161 .get_secret(GetSecretRequest {
162 store_name: store_name.into(),
163 key: key.into(),
164 ..Default::default()
165 })
166 .await
167 }
168
169 pub async fn get_bulk_secret<S>(
175 &mut self,
176 store_name: S,
177 metadata: Option<HashMap<String, String>>,
178 ) -> Result<GetBulkSecretResponse, Error>
179 where
180 S: Into<String>,
181 {
182 self.0
183 .get_bulk_secret(GetBulkSecretRequest {
184 store_name: store_name.into(),
185 metadata: metadata.unwrap_or_default(),
186 })
187 .await
188 }
189
190 pub async fn get_state<S>(
198 &mut self,
199 store_name: S,
200 key: S,
201 metadata: Option<HashMap<String, String>>,
202 ) -> Result<GetStateResponse, Error>
203 where
204 S: Into<String>,
205 {
206 let mut mdata = HashMap::<String, String>::new();
207 if let Some(m) = metadata {
208 mdata = m;
209 }
210
211 self.0
212 .get_state(GetStateRequest {
213 store_name: store_name.into(),
214 key: key.into(),
215 metadata: mdata,
216 ..Default::default()
217 })
218 .await
219 }
220
221 pub async fn save_state<S>(
234 &mut self,
235 store_name: S,
236 key: S,
237 value: Vec<u8>,
238 etag: Option<Etag>,
239 metadata: Option<HashMap<String, String>>,
240 options: Option<StateOptions>,
241 ) -> Result<(), Error>
242 where
243 S: Into<String>,
244 {
245 let states = vec![StateItem {
246 key: key.into(),
247 value,
248 etag,
249 metadata: metadata.unwrap_or_default(),
250 options,
251 }];
252
253 self.save_bulk_states(store_name, states).await
254 }
255
256 pub async fn save_bulk_states<S, I>(&mut self, store_name: S, items: I) -> Result<(), Error>
263 where
264 S: Into<String>,
265 I: Into<Vec<StateItem>>,
266 {
267 self.0
268 .save_state(SaveStateRequest {
269 store_name: store_name.into(),
270 states: items.into(),
271 })
272 .await
273 }
274
275 pub async fn query_state_alpha1<S>(
282 &mut self,
283 store_name: S,
284 query: Value,
285 metadata: Option<HashMap<String, String>>,
286 ) -> Result<QueryStateResponse, Error>
287 where
288 S: Into<String>,
289 {
290 let mut mdata = HashMap::<String, String>::new();
291 if let Some(m) = metadata {
292 mdata = m;
293 }
294
295 self.0
296 .query_state_alpha1(QueryStateRequest {
297 store_name: store_name.into(),
298 query: serde_json::to_string(&query).unwrap(),
299 metadata: mdata,
300 })
301 .await
302 }
303
304 pub async fn delete_bulk_state<I, K>(&mut self, store_name: K, states: I) -> Result<(), Error>
311 where
312 I: IntoIterator<Item = (K, Vec<u8>)>,
313 K: Into<String>,
314 {
315 self.0
316 .delete_bulk_state(DeleteBulkStateRequest {
317 store_name: store_name.into(),
318 states: states.into_iter().map(|pair| pair.into()).collect(),
319 })
320 .await
321 }
322
323 pub async fn delete_state<S>(
330 &mut self,
331 store_name: S,
332 key: S,
333 metadata: Option<HashMap<String, String>>,
334 ) -> Result<(), Error>
335 where
336 S: Into<String>,
337 {
338 let mut mdata = HashMap::<String, String>::new();
339 if let Some(m) = metadata {
340 mdata = m;
341 }
342
343 self.0
344 .delete_state(DeleteStateRequest {
345 store_name: store_name.into(),
346 key: key.into(),
347 metadata: mdata,
348 ..Default::default()
349 })
350 .await
351 }
352
353 pub async fn set_metadata<S>(&mut self, key: S, value: S) -> Result<(), Error>
360 where
361 S: Into<String>,
362 {
363 self.0
364 .set_metadata(SetMetadataRequest {
365 key: key.into(),
366 value: value.into(),
367 })
368 .await
369 }
370
371 pub async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
374 self.0.get_metadata().await
375 }
376
377 pub async fn invoke_actor<I, M, TInput, TOutput>(
386 &mut self,
387 actor_type: I,
388 actor_id: I,
389 method_name: M,
390 input: TInput,
391 metadata: Option<HashMap<String, String>>,
392 ) -> Result<TOutput, Error>
393 where
394 I: Into<String>,
395 M: Into<String>,
396 TInput: Serialize,
397 TOutput: for<'a> Deserialize<'a>,
398 {
399 let mut mdata = HashMap::<String, String>::new();
400 if let Some(m) = metadata {
401 mdata = m;
402 }
403
404 mdata.insert("Content-Type".to_string(), "application/json".to_string());
405
406 let data = match serde_json::to_vec(&input) {
407 Ok(data) => data,
408 Err(_e) => return Err(Error::SerializationError),
409 };
410
411 let res = self
412 .0
413 .invoke_actor(InvokeActorRequest {
414 actor_type: actor_type.into(),
415 actor_id: actor_id.into(),
416 method: method_name.into(),
417 data,
418 metadata: mdata,
419 })
420 .await?;
421
422 match serde_json::from_slice::<TOutput>(&res.data) {
423 Ok(output) => Ok(output),
424 Err(_e) => Err(Error::SerializationError),
425 }
426 }
427
428 pub async fn get_configuration<S, K>(
435 &mut self,
436 store_name: S,
437 keys: Vec<K>,
438 metadata: Option<HashMap<String, String>>,
439 ) -> Result<GetConfigurationResponse, Error>
440 where
441 S: Into<String>,
442 K: Into<String>,
443 {
444 let request = GetConfigurationRequest {
445 store_name: store_name.into(),
446 keys: keys.into_iter().map(|key| key.into()).collect(),
447 metadata: metadata.unwrap_or_default(),
448 };
449 self.0.get_configuration(request).await
450 }
451
452 pub async fn subscribe_configuration<S>(
454 &mut self,
455 store_name: S,
456 keys: Vec<S>,
457 metadata: Option<HashMap<String, String>>,
458 ) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
459 where
460 S: Into<String>,
461 {
462 let request = SubscribeConfigurationRequest {
463 store_name: store_name.into(),
464 keys: keys.into_iter().map(|key| key.into()).collect(),
465 metadata: metadata.unwrap_or_default(),
466 };
467 self.0.subscribe_configuration(request).await
468 }
469
470 pub async fn unsubscribe_configuration<S>(
472 &mut self,
473 store_name: S,
474 id: S,
475 ) -> Result<UnsubscribeConfigurationResponse, Error>
476 where
477 S: Into<String>,
478 {
479 let request = UnsubscribeConfigurationRequest {
480 id: id.into(),
481 store_name: store_name.into(),
482 };
483 self.0.unsubscribe_configuration(request).await
484 }
485
486 pub async fn encrypt<R>(
493 &mut self,
494 payload: ReaderStream<R>,
495 request_options: EncryptRequestOptions,
496 ) -> Result<Vec<StreamPayload>, Status>
497 where
498 R: AsyncRead + Send,
499 {
500 let request_options = &Some(request_options);
502 let requested_items: Vec<EncryptRequest> = payload
503 .0
504 .enumerate()
505 .fold(vec![], |mut init, (i, bytes)| async move {
506 let stream_payload = StreamPayload {
507 data: bytes.unwrap().to_vec(),
508 seq: 0,
509 };
510 if i == 0 {
511 init.push(EncryptRequest {
512 options: request_options.clone(),
513 payload: Some(stream_payload),
514 });
515 } else {
516 init.push(EncryptRequest {
517 options: None,
518 payload: Some(stream_payload),
519 });
520 }
521 init
522 })
523 .await;
524 self.0.encrypt(requested_items).await
525 }
526
527 pub async fn decrypt(
534 &mut self,
535 encrypted: Vec<StreamPayload>,
536 options: DecryptRequestOptions,
537 ) -> Result<Vec<u8>, Status> {
538 let requested_items: Vec<DecryptRequest> = encrypted
539 .iter()
540 .enumerate()
541 .map(|(i, item)| {
542 if i == 0 {
543 DecryptRequest {
544 options: Some(options.clone()),
545 payload: Some(item.clone()),
546 }
547 } else {
548 DecryptRequest {
549 options: None,
550 payload: Some(item.clone()),
551 }
552 }
553 })
554 .collect();
555 self.0.decrypt(requested_items).await
556 }
557
558 pub async fn schedule_job_alpha1(
565 &mut self,
566 job: Job,
567 overwrite: Option<bool>,
568 ) -> Result<ScheduleJobResponse, Error> {
569 let request = ScheduleJobRequest {
570 job: Some(job.clone()),
571 overwrite: overwrite.unwrap_or(false),
572 };
573 self.0.schedule_job_alpha1(request).await
574 }
575
576 pub async fn get_job_alpha1(&mut self, name: &str) -> Result<GetJobResponse, Error> {
582 let request = GetJobRequest {
583 name: name.to_string(),
584 };
585 self.0.get_job_alpha1(request).await
586 }
587
588 pub async fn delete_job_alpha1(&mut self, name: &str) -> Result<DeleteJobResponse, Error> {
594 let request = DeleteJobRequest {
595 name: name.to_string(),
596 };
597 self.0.delete_job_alpha1(request).await
598 }
599
600 pub async fn converse_alpha1(
606 &mut self,
607 request: ConversationRequest,
608 ) -> Result<ConversationResponse, Error> {
609 self.0.converse_alpha1(request).await
610 }
611}
612
613#[async_trait]
614pub trait DaprInterface: Sized {
615 async fn connect(addr: String) -> Result<Self, Error>;
616 async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error>;
617 async fn invoke_service(
618 &mut self,
619 request: InvokeServiceRequest,
620 ) -> Result<InvokeServiceResponse, Error>;
621 async fn invoke_binding(
622 &mut self,
623 request: InvokeBindingRequest,
624 ) -> Result<InvokeBindingResponse, Error>;
625 async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error>;
626 async fn get_bulk_secret(
627 &mut self,
628 request: GetBulkSecretRequest,
629 ) -> Result<GetBulkSecretResponse, Error>;
630 async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error>;
631 async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error>;
632 async fn query_state_alpha1(
633 &mut self,
634 request: QueryStateRequest,
635 ) -> Result<QueryStateResponse, Error>;
636 async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error>;
637 async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
638 async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
639 async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
640 async fn invoke_actor(
641 &mut self,
642 request: InvokeActorRequest,
643 ) -> Result<InvokeActorResponse, Error>;
644 async fn get_configuration(
645 &mut self,
646 request: GetConfigurationRequest,
647 ) -> Result<GetConfigurationResponse, Error>;
648 async fn subscribe_configuration(
649 &mut self,
650 request: SubscribeConfigurationRequest,
651 ) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
652 async fn unsubscribe_configuration(
653 &mut self,
654 request: UnsubscribeConfigurationRequest,
655 ) -> Result<UnsubscribeConfigurationResponse, Error>;
656
657 async fn encrypt(&mut self, payload: Vec<EncryptRequest>)
658 -> Result<Vec<StreamPayload>, Status>;
659
660 async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;
661
662 async fn schedule_job_alpha1(
663 &mut self,
664 request: ScheduleJobRequest,
665 ) -> Result<ScheduleJobResponse, Error>;
666
667 async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error>;
668
669 async fn delete_job_alpha1(
670 &mut self,
671 request: DeleteJobRequest,
672 ) -> Result<DeleteJobResponse, Error>;
673
674 async fn converse_alpha1(
675 &mut self,
676 request: ConversationRequest,
677 ) -> Result<ConversationResponse, Error>;
678}
679
680#[async_trait]
681impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
682 async fn connect(addr: String) -> Result<Self, Error> {
683 Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?)
684 }
685
686 async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
687 self.publish_event(Request::new(request))
688 .await?
689 .into_inner();
690 Ok(())
691 }
692
693 async fn invoke_service(
694 &mut self,
695 request: InvokeServiceRequest,
696 ) -> Result<InvokeServiceResponse, Error> {
697 Ok(self
698 .invoke_service(Request::new(request))
699 .await?
700 .into_inner())
701 }
702
703 async fn invoke_binding(
704 &mut self,
705 request: InvokeBindingRequest,
706 ) -> Result<InvokeBindingResponse, Error> {
707 Ok(self
708 .invoke_binding(Request::new(request))
709 .await?
710 .into_inner())
711 }
712
713 async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error> {
714 Ok(self.get_secret(Request::new(request)).await?.into_inner())
715 }
716
717 async fn get_bulk_secret(
718 &mut self,
719 request: GetBulkSecretRequest,
720 ) -> Result<GetBulkSecretResponse, Error> {
721 Ok(self
722 .get_bulk_secret(Request::new(request))
723 .await?
724 .into_inner())
725 }
726
727 async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error> {
728 Ok(self.get_state(Request::new(request)).await?.into_inner())
729 }
730
731 async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> {
732 self.save_state(Request::new(request)).await?.into_inner();
733 Ok(())
734 }
735
736 async fn query_state_alpha1(
737 &mut self,
738 request: QueryStateRequest,
739 ) -> Result<QueryStateResponse, Error> {
740 Ok(self
741 .query_state_alpha1(Request::new(request))
742 .await?
743 .into_inner())
744 }
745
746 async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error> {
747 self.delete_state(Request::new(request)).await?.into_inner();
748 Ok(())
749 }
750
751 async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error> {
752 self.delete_bulk_state(Request::new(request))
753 .await?
754 .into_inner();
755 Ok(())
756 }
757
758 async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error> {
759 self.set_metadata(Request::new(request)).await?.into_inner();
760 Ok(())
761 }
762
763 async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
764 Ok(self.get_metadata(GetMetadataRequest {}).await?.into_inner())
765 }
766
767 async fn invoke_actor(
768 &mut self,
769 request: InvokeActorRequest,
770 ) -> Result<InvokeActorResponse, Error> {
771 Ok(self.invoke_actor(Request::new(request)).await?.into_inner())
772 }
773
774 async fn get_configuration(
775 &mut self,
776 request: GetConfigurationRequest,
777 ) -> Result<GetConfigurationResponse, Error> {
778 Ok(self
779 .get_configuration(Request::new(request))
780 .await?
781 .into_inner())
782 }
783
784 async fn subscribe_configuration(
785 &mut self,
786 request: SubscribeConfigurationRequest,
787 ) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
788 Ok(self
789 .subscribe_configuration(Request::new(request))
790 .await?
791 .into_inner())
792 }
793
794 async fn unsubscribe_configuration(
795 &mut self,
796 request: UnsubscribeConfigurationRequest,
797 ) -> Result<UnsubscribeConfigurationResponse, Error> {
798 Ok(self
799 .unsubscribe_configuration(Request::new(request))
800 .await?
801 .into_inner())
802 }
803
804 async fn encrypt(
811 &mut self,
812 request: Vec<EncryptRequest>,
813 ) -> Result<Vec<StreamPayload>, Status> {
814 let request = Request::new(tokio_stream::iter(request));
815 let stream = self.encrypt_alpha1(request).await?;
816 let mut stream = stream.into_inner();
817 let mut return_data = vec![];
818 while let Some(resp) = stream.next().await {
819 if let Ok(resp) = resp {
820 if let Some(data) = resp.payload {
821 return_data.push(data)
822 }
823 }
824 }
825 Ok(return_data)
826 }
827
828 async fn decrypt(&mut self, request: Vec<DecryptRequest>) -> Result<Vec<u8>, Status> {
835 let request = Request::new(tokio_stream::iter(request));
836 let stream = self.decrypt_alpha1(request).await?;
837 let mut stream = stream.into_inner();
838 let mut data = vec![];
839 while let Some(resp) = stream.next().await {
840 if let Ok(resp) = resp {
841 if let Some(mut payload) = resp.payload {
842 data.append(payload.data.as_mut())
843 }
844 }
845 }
846 Ok(data)
847 }
848
849 async fn schedule_job_alpha1(
850 &mut self,
851 request: ScheduleJobRequest,
852 ) -> Result<ScheduleJobResponse, Error> {
853 Ok(self.schedule_job_alpha1(request).await?.into_inner())
854 }
855
856 async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error> {
857 Ok(self
858 .get_job_alpha1(Request::new(request))
859 .await?
860 .into_inner())
861 }
862
863 async fn delete_job_alpha1(
864 &mut self,
865 request: DeleteJobRequest,
866 ) -> Result<DeleteJobResponse, Error> {
867 Ok(self
868 .delete_job_alpha1(Request::new(request))
869 .await?
870 .into_inner())
871 }
872
873 async fn converse_alpha1(
874 &mut self,
875 request: ConversationRequest,
876 ) -> Result<ConversationResponse, Error> {
877 Ok(self
878 .converse_alpha1(Request::new(request))
879 .await?
880 .into_inner())
881 }
882}
883
884pub type InvokeServiceRequest = dapr_v1::InvokeServiceRequest;
886
887pub type InvokeServiceResponse = common_v1::InvokeResponse;
889
890pub type InvokeBindingRequest = dapr_v1::InvokeBindingRequest;
892
893pub type InvokeBindingResponse = dapr_v1::InvokeBindingResponse;
895
896pub type PublishEventRequest = dapr_v1::PublishEventRequest;
898
899pub type GetStateRequest = dapr_v1::GetStateRequest;
901
902pub type GetStateResponse = dapr_v1::GetStateResponse;
904
905pub type SaveStateRequest = dapr_v1::SaveStateRequest;
907
908pub type StateItem = common_v1::StateItem;
910
911pub type StateOptions = common_v1::StateOptions;
913
914pub type Etag = common_v1::Etag;
916
917pub type QueryStateRequest = dapr_v1::QueryStateRequest;
919
920pub type QueryStateResponse = dapr_v1::QueryStateResponse;
922
923pub type DeleteStateRequest = dapr_v1::DeleteStateRequest;
925
926pub type DeleteBulkStateRequest = dapr_v1::DeleteBulkStateRequest;
928
929pub type GetSecretRequest = dapr_v1::GetSecretRequest;
931
932pub type GetSecretResponse = dapr_v1::GetSecretResponse;
934
935pub type GetBulkSecretRequest = dapr_v1::GetBulkSecretRequest;
937
938pub type GetBulkSecretResponse = dapr_v1::GetBulkSecretResponse;
940
941pub type GetMetadataResponse = dapr_v1::GetMetadataResponse;
943
944pub type GetMetadataRequest = dapr_v1::GetMetadataRequest;
946
947pub type SetMetadataRequest = dapr_v1::SetMetadataRequest;
949
950pub type InvokeActorRequest = dapr_v1::InvokeActorRequest;
952
953pub type InvokeActorResponse = dapr_v1::InvokeActorResponse;
955pub type GetConfigurationRequest = dapr_v1::GetConfigurationRequest;
957
958pub type GetConfigurationResponse = dapr_v1::GetConfigurationResponse;
960
961pub type SubscribeConfigurationRequest = dapr_v1::SubscribeConfigurationRequest;
963
964pub type SubscribeConfigurationResponse = dapr_v1::SubscribeConfigurationResponse;
966
967pub type UnsubscribeConfigurationRequest = dapr_v1::UnsubscribeConfigurationRequest;
969
970pub type UnsubscribeConfigurationResponse = dapr_v1::UnsubscribeConfigurationResponse;
972
973pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;
975
976pub type EncryptRequest = crate::dapr::proto::runtime::v1::EncryptRequest;
978
979pub type DecryptRequest = crate::dapr::proto::runtime::v1::DecryptRequest;
981
982pub type EncryptRequestOptions = crate::dapr::proto::runtime::v1::EncryptRequestOptions;
984
985pub type DecryptRequestOptions = crate::dapr::proto::runtime::v1::DecryptRequestOptions;
987
988pub type Job = crate::dapr::proto::runtime::v1::Job;
990
991pub type JobFailurePolicy = crate::dapr::proto::common::v1::JobFailurePolicy;
993
994pub type ScheduleJobRequest = crate::dapr::proto::runtime::v1::ScheduleJobRequest;
996
997pub type ScheduleJobResponse = crate::dapr::proto::runtime::v1::ScheduleJobResponse;
999
1000pub type GetJobRequest = crate::dapr::proto::runtime::v1::GetJobRequest;
1002
1003pub type GetJobResponse = crate::dapr::proto::runtime::v1::GetJobResponse;
1005
1006pub type DeleteJobRequest = crate::dapr::proto::runtime::v1::DeleteJobRequest;
1008
1009pub type DeleteJobResponse = crate::dapr::proto::runtime::v1::DeleteJobResponse;
1011
1012pub type ConversationRequest = crate::dapr::proto::runtime::v1::ConversationRequest;
1014
1015pub type ConversationResponse = crate::dapr::proto::runtime::v1::ConversationResponse;
1017
1018pub type ConversationResult = crate::dapr::proto::runtime::v1::ConversationResult;
1020
1021pub type ConversationInput = crate::dapr::proto::runtime::v1::ConversationInput;
1023
1024type StreamPayload = crate::dapr::proto::common::v1::StreamPayload;
1025impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
1026where
1027 K: Into<String>,
1028{
1029 fn from((key, value): (K, Vec<u8>)) -> Self {
1030 common_v1::StateItem {
1031 key: key.into(),
1032 value,
1033 ..Default::default()
1034 }
1035 }
1036}
1037
1038pub struct ReaderStream<T>(tokio_util::io::ReaderStream<T>);
1039
1040impl<T: AsyncRead> ReaderStream<T> {
1041 pub fn new(data: T) -> Self {
1042 ReaderStream(tokio_util::io::ReaderStream::new(data))
1043 }
1044}
1045
1046pub struct JobBuilder {
1047 schedule: Option<String>,
1048 data: Option<Any>,
1049 name: String,
1050 ttl: Option<String>,
1051 repeats: Option<u32>,
1052 due_time: Option<String>,
1053 failure_policy: Option<JobFailurePolicy>,
1054}
1055
1056impl JobBuilder {
1057 pub fn new(name: &str) -> Self {
1059 JobBuilder {
1060 schedule: None,
1061 data: None,
1062 name: name.to_string(),
1063 ttl: None,
1064 repeats: None,
1065 due_time: None,
1066 failure_policy: None,
1067 }
1068 }
1069
1070 pub fn with_schedule(mut self, schedule: &str) -> Self {
1071 self.schedule = Some(schedule.into());
1072 self
1073 }
1074
1075 pub fn with_data(mut self, data: Any) -> Self {
1076 self.data = Some(data);
1077 self
1078 }
1079
1080 pub fn with_ttl(mut self, ttl: &str) -> Self {
1081 self.ttl = Some(ttl.into());
1082 self
1083 }
1084
1085 pub fn with_repeats(mut self, repeats: u32) -> Self {
1086 self.repeats = Some(repeats);
1087 self
1088 }
1089
1090 pub fn with_due_time(mut self, due_time: &str) -> Self {
1091 self.due_time = Some(due_time.into());
1092 self
1093 }
1094
1095 pub fn with_failure_policy(mut self, policy: JobFailurePolicy) -> Self {
1096 self.failure_policy = Some(policy);
1097 self
1098 }
1099
1100 pub fn build(self) -> Job {
1101 Job {
1102 schedule: self.schedule,
1103 data: self.data,
1104 name: self.name,
1105 ttl: self.ttl,
1106 repeats: self.repeats,
1107 due_time: self.due_time,
1108 failure_policy: self.failure_policy,
1109 }
1110 }
1111}
1112
1113pub enum JobFailurePolicyType {
1115 Drop {},
1116 Constant {},
1117}
1118
1119pub struct JobFailurePolicyBuilder {
1120 policy: JobFailurePolicyType,
1121 pub retry_interval: Option<Duration>,
1122 pub max_retries: Option<u32>,
1123}
1124
1125impl JobFailurePolicyBuilder {
1126 pub fn new(policy: JobFailurePolicyType) -> Self {
1127 JobFailurePolicyBuilder {
1128 policy,
1129 retry_interval: None,
1130 max_retries: None,
1131 }
1132 }
1133
1134 pub fn with_retry_interval(mut self, interval: Duration) -> Self {
1135 self.retry_interval = Some(interval);
1137 self
1138 }
1139
1140 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
1141 self.max_retries = Some(max_retries);
1142 self
1143 }
1144
1145 pub fn build(self) -> common_v1::JobFailurePolicy {
1146 match self.policy {
1147 JobFailurePolicyType::Drop {} => common_v1::JobFailurePolicy {
1148 policy: Some(Policy::Drop(Default::default())),
1149 },
1150 JobFailurePolicyType::Constant {} => JobFailurePolicy {
1151 policy: Some(Policy::Constant(JobFailurePolicyConstant {
1152 interval: self.retry_interval.map(|interval| {
1153 prost_types::Duration::try_from(interval)
1154 .expect("Failed to convert Duration")
1155 }),
1156 max_retries: self.max_retries,
1157 })),
1158 },
1159 }
1160 }
1161}
1162
1163pub struct ConversationInputBuilder {
1164 content: String,
1165 role: Option<String>,
1166 scrub_pii: Option<bool>,
1167}
1168
1169impl ConversationInputBuilder {
1170 pub fn new(message: &str) -> Self {
1171 ConversationInputBuilder {
1172 content: message.to_string(),
1173 role: None,
1174 scrub_pii: None,
1175 }
1176 }
1177
1178 pub fn build(self) -> ConversationInput {
1179 ConversationInput {
1180 content: self.content,
1181 role: self.role,
1182 scrub_pii: self.scrub_pii,
1183 }
1184 }
1185}
1186
1187pub struct ConversationRequestBuilder {
1188 name: String,
1189 context_id: Option<String>,
1190 inputs: Vec<ConversationInput>,
1191 parameters: HashMap<String, Any>,
1192 metadata: HashMap<String, String>,
1193 scrub_pii: Option<bool>,
1194 temperature: Option<f64>,
1195}
1196impl ConversationRequestBuilder {
1197 pub fn new(name: &str, inputs: Vec<ConversationInput>) -> Self {
1198 ConversationRequestBuilder {
1199 name: name.to_string(),
1200 context_id: None,
1201 inputs,
1202 parameters: Default::default(),
1203 metadata: Default::default(),
1204 scrub_pii: None,
1205 temperature: None,
1206 }
1207 }
1208
1209 pub fn build(self) -> ConversationRequest {
1210 ConversationRequest {
1211 name: self.name,
1212 context_id: self.context_id,
1213 inputs: self.inputs,
1214 parameters: self.parameters,
1215 metadata: self.metadata,
1216 scrub_pii: self.scrub_pii,
1217 temperature: self.temperature,
1218 }
1219 }
1220}