dapr/
client.rs

1use crate::dapr::proto::common::v1::job_failure_policy::Policy;
2use crate::dapr::proto::common::v1::JobFailurePolicyConstant;
3use crate::dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
4use crate::error::Error;
5use async_trait::async_trait;
6use futures::StreamExt;
7use prost_types::Any;
8use serde::{Deserialize, Serialize};
9use serde_json::Value;
10use std::collections::HashMap;
11use std::time::Duration;
12use tokio::io::AsyncRead;
13use tonic::codegen::tokio_stream;
14use tonic::{transport::Channel as TonicChannel, Request};
15use tonic::{Status, Streaming};
16
17#[derive(Clone)]
18pub struct Client<T>(T);
19
20impl<T: DaprInterface> Client<T> {
21    /// Connect to a Dapr enabled app.
22    ///
23    /// # Arguments
24    ///
25    /// * `addr` - Address of gRPC server to connect to.
26    pub async fn connect(addr: String) -> Result<Self, Error> {
27        // Get the Dapr port to create a connection
28        let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
29        let address = format!("{addr}:{port}");
30
31        Ok(Client(T::connect(address).await?))
32    }
33
34    /// Connect to the Dapr sidecar with a specific port.
35    ///
36    /// # Arguments
37    ///
38    /// * `addr` - Address of gRPC server to connect to.
39    /// * `port` - Port of the gRPC server to connect to.
40    pub async fn connect_with_port(addr: String, port: String) -> Result<Self, Error> {
41        // assert that port is between 1 and 65535
42        let port: u16 = match port.parse::<u16>() {
43            Ok(p) => p,
44            Err(_) => {
45                panic!("Port must be a number between 1 and 65535");
46            }
47        };
48
49        let address = format!("{addr}:{port}");
50
51        Ok(Client(T::connect(address).await?))
52    }
53
54    /// Invoke a method in a Dapr enabled app.
55    ///
56    /// # Arguments
57    ///
58    /// * `app_id` - Id of the application running.
59    /// * `method_name` - Name of the method to invoke.
60    /// * `data` - Required. Bytes value or data required to invoke service.
61    pub async fn invoke_service<I, M>(
62        &mut self,
63        app_id: I,
64        method_name: M,
65        data: Option<Any>,
66    ) -> Result<InvokeServiceResponse, Error>
67    where
68        I: Into<String>,
69        M: Into<String>,
70    {
71        self.0
72            .invoke_service(InvokeServiceRequest {
73                id: app_id.into(),
74                message: common_v1::InvokeRequest {
75                    method: method_name.into(),
76                    data,
77                    ..Default::default()
78                }
79                .into(),
80            })
81            .await
82    }
83
84    /// Invoke an Dapr output binding.
85    ///
86    /// # Arguments
87    ///
88    /// * `name` - The name of the output binding to invoke.
89    /// * `data` - The data which will be sent to the output binding.
90    pub async fn invoke_binding<S>(
91        &mut self,
92        name: S,
93        data: Vec<u8>,
94        operation: S,
95        metadata: Option<HashMap<String, String>>,
96    ) -> Result<InvokeBindingResponse, Error>
97    where
98        S: Into<String>,
99    {
100        let mut mdata = HashMap::<String, String>::new();
101        if let Some(m) = metadata {
102            mdata = m;
103        }
104
105        self.0
106            .invoke_binding(InvokeBindingRequest {
107                name: name.into(),
108                data,
109                operation: operation.into(),
110                metadata: mdata,
111            })
112            .await
113    }
114
115    /// Publish a payload to multiple consumers who are listening on a topic.
116    ///
117    /// Dapr guarantees at least once semantics for this endpoint.
118    ///
119    /// # Arguments
120    ///
121    /// * `pubsub_name` - Name of the pubsub component
122    /// * `topic` - Pubsub topic.
123    /// * `data` - The data which will be published to topic.
124    pub async fn publish_event<S>(
125        &mut self,
126        pubsub_name: S,
127        topic: S,
128        data_content_type: S,
129        data: Vec<u8>,
130        metadata: Option<HashMap<String, String>>,
131    ) -> Result<(), Error>
132    where
133        S: Into<String>,
134    {
135        let mut mdata = HashMap::<String, String>::new();
136        if let Some(m) = metadata {
137            mdata = m;
138        }
139        self.0
140            .publish_event(PublishEventRequest {
141                pubsub_name: pubsub_name.into(),
142                topic: topic.into(),
143                data_content_type: data_content_type.into(),
144                data,
145                metadata: mdata,
146            })
147            .await
148    }
149
150    /// Get the secret for a specific key.
151    ///
152    /// # Arguments
153    ///
154    /// * `store_name` - The name of secret store.
155    /// * `key` - The name of secret key.
156    pub async fn get_secret<S>(&mut self, store_name: S, key: S) -> Result<GetSecretResponse, Error>
157    where
158        S: Into<String>,
159    {
160        self.0
161            .get_secret(GetSecretRequest {
162                store_name: store_name.into(),
163                key: key.into(),
164                ..Default::default()
165            })
166            .await
167    }
168
169    /// Get all secrets for a given store
170    ///
171    /// # Arguments
172    ///
173    /// * `store_name` - The name of the secret store.
174    pub async fn get_bulk_secret<S>(
175        &mut self,
176        store_name: S,
177        metadata: Option<HashMap<String, String>>,
178    ) -> Result<GetBulkSecretResponse, Error>
179    where
180        S: Into<String>,
181    {
182        self.0
183            .get_bulk_secret(GetBulkSecretRequest {
184                store_name: store_name.into(),
185                metadata: metadata.unwrap_or_default(),
186            })
187            .await
188    }
189
190    /// Get the state for a specific key.
191    ///
192    /// # Arguments
193    ///
194    /// * `store_name` - The name of state store.
195    /// * `key` - The key of the desired state.
196    /// * `metadata` - Any metadata pairs to include in the request.
197    pub async fn get_state<S>(
198        &mut self,
199        store_name: S,
200        key: S,
201        metadata: Option<HashMap<String, String>>,
202    ) -> Result<GetStateResponse, Error>
203    where
204        S: Into<String>,
205    {
206        let mut mdata = HashMap::<String, String>::new();
207        if let Some(m) = metadata {
208            mdata = m;
209        }
210
211        self.0
212            .get_state(GetStateRequest {
213                store_name: store_name.into(),
214                key: key.into(),
215                metadata: mdata,
216                ..Default::default()
217            })
218            .await
219    }
220
221    /// Save an array of state objects.
222    ///
223    /// This does not include any etag or metadata options.
224    ///
225    /// # Arguments
226    ///
227    /// * `store_name` - The name of state store.
228    /// * `key` - The key for the value
229    /// * `value` - The value to be saved for the key
230    /// * `etag` - The etag identifier
231    /// * `metadata` - Any metadata pairs to include in the request.
232    /// * `options` - Any state option
233    pub async fn save_state<S>(
234        &mut self,
235        store_name: S,
236        key: S,
237        value: Vec<u8>,
238        etag: Option<Etag>,
239        metadata: Option<HashMap<String, String>>,
240        options: Option<StateOptions>,
241    ) -> Result<(), Error>
242    where
243        S: Into<String>,
244    {
245        let states = vec![StateItem {
246            key: key.into(),
247            value,
248            etag,
249            metadata: metadata.unwrap_or_default(),
250            options,
251        }];
252
253        self.save_bulk_states(store_name, states).await
254    }
255
256    /// Save an array of state objects.
257    ///
258    /// # Arguments
259    ///
260    /// * `store_name` - The name of state store.
261    /// * `items` - The array of the state items.
262    pub async fn save_bulk_states<S, I>(&mut self, store_name: S, items: I) -> Result<(), Error>
263    where
264        S: Into<String>,
265        I: Into<Vec<StateItem>>,
266    {
267        self.0
268            .save_state(SaveStateRequest {
269                store_name: store_name.into(),
270                states: items.into(),
271            })
272            .await
273    }
274
275    /// Query state objects based on specific query conditions
276    ///
277    /// # Arguments
278    ///
279    /// * `store_name` - The name of state store.
280    /// * `query` - The query request (json)
281    pub async fn query_state_alpha1<S>(
282        &mut self,
283        store_name: S,
284        query: Value,
285        metadata: Option<HashMap<String, String>>,
286    ) -> Result<QueryStateResponse, Error>
287    where
288        S: Into<String>,
289    {
290        let mut mdata = HashMap::<String, String>::new();
291        if let Some(m) = metadata {
292            mdata = m;
293        }
294
295        self.0
296            .query_state_alpha1(QueryStateRequest {
297                store_name: store_name.into(),
298                query: serde_json::to_string(&query).unwrap(),
299                metadata: mdata,
300            })
301            .await
302    }
303
304    /// Delete an array of state objects.
305    ///
306    /// # Arguments
307    ///
308    /// * `store_name` - The name of state store.
309    /// * `states` - The array of the state key values.
310    pub async fn delete_bulk_state<I, K>(&mut self, store_name: K, states: I) -> Result<(), Error>
311    where
312        I: IntoIterator<Item = (K, Vec<u8>)>,
313        K: Into<String>,
314    {
315        self.0
316            .delete_bulk_state(DeleteBulkStateRequest {
317                store_name: store_name.into(),
318                states: states.into_iter().map(|pair| pair.into()).collect(),
319            })
320            .await
321    }
322
323    /// Delete the state for a specific key.
324    ///
325    /// # Arguments
326    ///
327    /// * `store_name` - The name of state store.
328    /// * `key` - The key of the desired state.
329    pub async fn delete_state<S>(
330        &mut self,
331        store_name: S,
332        key: S,
333        metadata: Option<HashMap<String, String>>,
334    ) -> Result<(), Error>
335    where
336        S: Into<String>,
337    {
338        let mut mdata = HashMap::<String, String>::new();
339        if let Some(m) = metadata {
340            mdata = m;
341        }
342
343        self.0
344            .delete_state(DeleteStateRequest {
345                store_name: store_name.into(),
346                key: key.into(),
347                metadata: mdata,
348                ..Default::default()
349            })
350            .await
351    }
352
353    /// Set sidecar Metadata
354    ///
355    /// # Arguments
356    ///
357    /// * `key` - The metadata key
358    /// * `value` - The metadata value
359    pub async fn set_metadata<S>(&mut self, key: S, value: S) -> Result<(), Error>
360    where
361        S: Into<String>,
362    {
363        self.0
364            .set_metadata(SetMetadataRequest {
365                key: key.into(),
366                value: value.into(),
367            })
368            .await
369    }
370
371    /// Set sidecar Metadata
372    ///
373    pub async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
374        self.0.get_metadata().await
375    }
376
377    /// Invoke a method in a Dapr actor.
378    ///
379    /// # Arguments
380    ///
381    /// * `actor_type` - Type of the actor.
382    /// * `actor_id` - Id of the actor.
383    /// * `method_name` - Name of the method to invoke.
384    /// * `input` - Required. Data required to invoke service, should be json serializable.
385    pub async fn invoke_actor<I, M, TInput, TOutput>(
386        &mut self,
387        actor_type: I,
388        actor_id: I,
389        method_name: M,
390        input: TInput,
391        metadata: Option<HashMap<String, String>>,
392    ) -> Result<TOutput, Error>
393    where
394        I: Into<String>,
395        M: Into<String>,
396        TInput: Serialize,
397        TOutput: for<'a> Deserialize<'a>,
398    {
399        let mut mdata = HashMap::<String, String>::new();
400        if let Some(m) = metadata {
401            mdata = m;
402        }
403
404        mdata.insert("Content-Type".to_string(), "application/json".to_string());
405
406        let data = match serde_json::to_vec(&input) {
407            Ok(data) => data,
408            Err(_e) => return Err(Error::SerializationError),
409        };
410
411        let res = self
412            .0
413            .invoke_actor(InvokeActorRequest {
414                actor_type: actor_type.into(),
415                actor_id: actor_id.into(),
416                method: method_name.into(),
417                data,
418                metadata: mdata,
419            })
420            .await?;
421
422        match serde_json::from_slice::<TOutput>(&res.data) {
423            Ok(output) => Ok(output),
424            Err(_e) => Err(Error::SerializationError),
425        }
426    }
427
428    /// Get the configuration for a specific key
429    /// ///
430    /// # Arguments
431    ///
432    /// * `store_name` - The name of config store.
433    /// * `keys` - The key of the desired configuration.
434    pub async fn get_configuration<S, K>(
435        &mut self,
436        store_name: S,
437        keys: Vec<K>,
438        metadata: Option<HashMap<String, String>>,
439    ) -> Result<GetConfigurationResponse, Error>
440    where
441        S: Into<String>,
442        K: Into<String>,
443    {
444        let request = GetConfigurationRequest {
445            store_name: store_name.into(),
446            keys: keys.into_iter().map(|key| key.into()).collect(),
447            metadata: metadata.unwrap_or_default(),
448        };
449        self.0.get_configuration(request).await
450    }
451
452    /// Subscribe to configuration changes
453    pub async fn subscribe_configuration<S>(
454        &mut self,
455        store_name: S,
456        keys: Vec<S>,
457        metadata: Option<HashMap<String, String>>,
458    ) -> Result<Streaming<SubscribeConfigurationResponse>, Error>
459    where
460        S: Into<String>,
461    {
462        let request = SubscribeConfigurationRequest {
463            store_name: store_name.into(),
464            keys: keys.into_iter().map(|key| key.into()).collect(),
465            metadata: metadata.unwrap_or_default(),
466        };
467        self.0.subscribe_configuration(request).await
468    }
469
470    /// Unsubscribe from configuration changes
471    pub async fn unsubscribe_configuration<S>(
472        &mut self,
473        store_name: S,
474        id: S,
475    ) -> Result<UnsubscribeConfigurationResponse, Error>
476    where
477        S: Into<String>,
478    {
479        let request = UnsubscribeConfigurationRequest {
480            id: id.into(),
481            store_name: store_name.into(),
482        };
483        self.0.unsubscribe_configuration(request).await
484    }
485
486    /// Encrypt binary data using Dapr. returns `Vec<StreamPayload>` to be used in decrypt method
487    ///
488    /// # Arguments
489    ///
490    /// * `payload` - ReaderStream to the data to encrypt
491    /// * `request_option` - Encryption request options.
492    pub async fn encrypt<R>(
493        &mut self,
494        payload: ReaderStream<R>,
495        request_options: EncryptRequestOptions,
496    ) -> Result<Vec<StreamPayload>, Status>
497    where
498        R: AsyncRead + Send,
499    {
500        // have to have it as a reference for the async move below
501        let request_options = &Some(request_options);
502        let requested_items: Vec<EncryptRequest> = payload
503            .0
504            .enumerate()
505            .fold(vec![], |mut init, (i, bytes)| async move {
506                let stream_payload = StreamPayload {
507                    data: bytes.unwrap().to_vec(),
508                    seq: 0,
509                };
510                if i == 0 {
511                    init.push(EncryptRequest {
512                        options: request_options.clone(),
513                        payload: Some(stream_payload),
514                    });
515                } else {
516                    init.push(EncryptRequest {
517                        options: None,
518                        payload: Some(stream_payload),
519                    });
520                }
521                init
522            })
523            .await;
524        self.0.encrypt(requested_items).await
525    }
526
527    /// Decrypt binary data using Dapr. returns `Vec<u8>`.
528    ///
529    /// # Arguments
530    ///
531    /// * `encrypted` - Encrypted data usually returned from encrypted, `Vec<StreamPayload>`
532    /// * `options` - Decryption request options.
533    pub async fn decrypt(
534        &mut self,
535        encrypted: Vec<StreamPayload>,
536        options: DecryptRequestOptions,
537    ) -> Result<Vec<u8>, Status> {
538        let requested_items: Vec<DecryptRequest> = encrypted
539            .iter()
540            .enumerate()
541            .map(|(i, item)| {
542                if i == 0 {
543                    DecryptRequest {
544                        options: Some(options.clone()),
545                        payload: Some(item.clone()),
546                    }
547                } else {
548                    DecryptRequest {
549                        options: None,
550                        payload: Some(item.clone()),
551                    }
552                }
553            })
554            .collect();
555        self.0.decrypt(requested_items).await
556    }
557
558    /// Schedules a job with the Dapr Distributed Scheduler
559    ///
560    /// # Arguments
561    ///
562    /// * job - The job to schedule
563    /// * overwrite - Optional flag to overwrite an existing job with the same name
564    pub async fn schedule_job_alpha1(
565        &mut self,
566        job: Job,
567        overwrite: Option<bool>,
568    ) -> Result<ScheduleJobResponse, Error> {
569        let request = ScheduleJobRequest {
570            job: Some(job.clone()),
571            overwrite: overwrite.unwrap_or(false),
572        };
573        self.0.schedule_job_alpha1(request).await
574    }
575
576    /// Retrieves a scheduled job from the Dapr Distributed Scheduler
577    ///
578    /// # Arguments
579    ///
580    /// * name - The name of the job to retrieve
581    pub async fn get_job_alpha1(&mut self, name: &str) -> Result<GetJobResponse, Error> {
582        let request = GetJobRequest {
583            name: name.to_string(),
584        };
585        self.0.get_job_alpha1(request).await
586    }
587
588    /// Deletes a scheduled job from the Dapr Distributed Scheduler
589    ///
590    /// # Arguments
591    ///
592    /// * name - The name of the job to delete
593    pub async fn delete_job_alpha1(&mut self, name: &str) -> Result<DeleteJobResponse, Error> {
594        let request = DeleteJobRequest {
595            name: name.to_string(),
596        };
597        self.0.delete_job_alpha1(request).await
598    }
599
600    /// Converse with an LLM
601    ///
602    /// # Arguments
603    ///
604    /// * ConversationRequest - The request containing inputs to send to the LLM
605    pub async fn converse_alpha1(
606        &mut self,
607        request: ConversationRequest,
608    ) -> Result<ConversationResponse, Error> {
609        self.0.converse_alpha1(request).await
610    }
611}
612
613#[async_trait]
614pub trait DaprInterface: Sized {
615    async fn connect(addr: String) -> Result<Self, Error>;
616    async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error>;
617    async fn invoke_service(
618        &mut self,
619        request: InvokeServiceRequest,
620    ) -> Result<InvokeServiceResponse, Error>;
621    async fn invoke_binding(
622        &mut self,
623        request: InvokeBindingRequest,
624    ) -> Result<InvokeBindingResponse, Error>;
625    async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error>;
626    async fn get_bulk_secret(
627        &mut self,
628        request: GetBulkSecretRequest,
629    ) -> Result<GetBulkSecretResponse, Error>;
630    async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error>;
631    async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error>;
632    async fn query_state_alpha1(
633        &mut self,
634        request: QueryStateRequest,
635    ) -> Result<QueryStateResponse, Error>;
636    async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error>;
637    async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error>;
638    async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error>;
639    async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error>;
640    async fn invoke_actor(
641        &mut self,
642        request: InvokeActorRequest,
643    ) -> Result<InvokeActorResponse, Error>;
644    async fn get_configuration(
645        &mut self,
646        request: GetConfigurationRequest,
647    ) -> Result<GetConfigurationResponse, Error>;
648    async fn subscribe_configuration(
649        &mut self,
650        request: SubscribeConfigurationRequest,
651    ) -> Result<Streaming<SubscribeConfigurationResponse>, Error>;
652    async fn unsubscribe_configuration(
653        &mut self,
654        request: UnsubscribeConfigurationRequest,
655    ) -> Result<UnsubscribeConfigurationResponse, Error>;
656
657    async fn encrypt(&mut self, payload: Vec<EncryptRequest>)
658        -> Result<Vec<StreamPayload>, Status>;
659
660    async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;
661
662    async fn schedule_job_alpha1(
663        &mut self,
664        request: ScheduleJobRequest,
665    ) -> Result<ScheduleJobResponse, Error>;
666
667    async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error>;
668
669    async fn delete_job_alpha1(
670        &mut self,
671        request: DeleteJobRequest,
672    ) -> Result<DeleteJobResponse, Error>;
673
674    async fn converse_alpha1(
675        &mut self,
676        request: ConversationRequest,
677    ) -> Result<ConversationResponse, Error>;
678}
679
680#[async_trait]
681impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
682    async fn connect(addr: String) -> Result<Self, Error> {
683        Ok(dapr_v1::dapr_client::DaprClient::connect(addr).await?)
684    }
685
686    async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error> {
687        self.publish_event(Request::new(request))
688            .await?
689            .into_inner();
690        Ok(())
691    }
692
693    async fn invoke_service(
694        &mut self,
695        request: InvokeServiceRequest,
696    ) -> Result<InvokeServiceResponse, Error> {
697        Ok(self
698            .invoke_service(Request::new(request))
699            .await?
700            .into_inner())
701    }
702
703    async fn invoke_binding(
704        &mut self,
705        request: InvokeBindingRequest,
706    ) -> Result<InvokeBindingResponse, Error> {
707        Ok(self
708            .invoke_binding(Request::new(request))
709            .await?
710            .into_inner())
711    }
712
713    async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error> {
714        Ok(self.get_secret(Request::new(request)).await?.into_inner())
715    }
716
717    async fn get_bulk_secret(
718        &mut self,
719        request: GetBulkSecretRequest,
720    ) -> Result<GetBulkSecretResponse, Error> {
721        Ok(self
722            .get_bulk_secret(Request::new(request))
723            .await?
724            .into_inner())
725    }
726
727    async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error> {
728        Ok(self.get_state(Request::new(request)).await?.into_inner())
729    }
730
731    async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error> {
732        self.save_state(Request::new(request)).await?.into_inner();
733        Ok(())
734    }
735
736    async fn query_state_alpha1(
737        &mut self,
738        request: QueryStateRequest,
739    ) -> Result<QueryStateResponse, Error> {
740        Ok(self
741            .query_state_alpha1(Request::new(request))
742            .await?
743            .into_inner())
744    }
745
746    async fn delete_state(&mut self, request: DeleteStateRequest) -> Result<(), Error> {
747        self.delete_state(Request::new(request)).await?.into_inner();
748        Ok(())
749    }
750
751    async fn delete_bulk_state(&mut self, request: DeleteBulkStateRequest) -> Result<(), Error> {
752        self.delete_bulk_state(Request::new(request))
753            .await?
754            .into_inner();
755        Ok(())
756    }
757
758    async fn set_metadata(&mut self, request: SetMetadataRequest) -> Result<(), Error> {
759        self.set_metadata(Request::new(request)).await?.into_inner();
760        Ok(())
761    }
762
763    async fn get_metadata(&mut self) -> Result<GetMetadataResponse, Error> {
764        Ok(self.get_metadata(GetMetadataRequest {}).await?.into_inner())
765    }
766
767    async fn invoke_actor(
768        &mut self,
769        request: InvokeActorRequest,
770    ) -> Result<InvokeActorResponse, Error> {
771        Ok(self.invoke_actor(Request::new(request)).await?.into_inner())
772    }
773
774    async fn get_configuration(
775        &mut self,
776        request: GetConfigurationRequest,
777    ) -> Result<GetConfigurationResponse, Error> {
778        Ok(self
779            .get_configuration(Request::new(request))
780            .await?
781            .into_inner())
782    }
783
784    async fn subscribe_configuration(
785        &mut self,
786        request: SubscribeConfigurationRequest,
787    ) -> Result<Streaming<SubscribeConfigurationResponse>, Error> {
788        Ok(self
789            .subscribe_configuration(Request::new(request))
790            .await?
791            .into_inner())
792    }
793
794    async fn unsubscribe_configuration(
795        &mut self,
796        request: UnsubscribeConfigurationRequest,
797    ) -> Result<UnsubscribeConfigurationResponse, Error> {
798        Ok(self
799            .unsubscribe_configuration(Request::new(request))
800            .await?
801            .into_inner())
802    }
803
804    /// Encrypt binary data using Dapr. returns `Vec<StreamPayload>` to be used in decrypt method
805    ///
806    /// # Arguments
807    ///
808    /// * `payload` - ReaderStream to the data to encrypt
809    /// * `request_option` - Encryption request options.
810    async fn encrypt(
811        &mut self,
812        request: Vec<EncryptRequest>,
813    ) -> Result<Vec<StreamPayload>, Status> {
814        let request = Request::new(tokio_stream::iter(request));
815        let stream = self.encrypt_alpha1(request).await?;
816        let mut stream = stream.into_inner();
817        let mut return_data = vec![];
818        while let Some(resp) = stream.next().await {
819            if let Ok(resp) = resp {
820                if let Some(data) = resp.payload {
821                    return_data.push(data)
822                }
823            }
824        }
825        Ok(return_data)
826    }
827
828    /// Decrypt binary data using Dapr. returns `Vec<u8>`.
829    ///
830    /// # Arguments
831    ///
832    /// * `encrypted` - Encrypted data usually returned from encrypted, `Vec<StreamPayload>`
833    /// * `options` - Decryption request options.
834    async fn decrypt(&mut self, request: Vec<DecryptRequest>) -> Result<Vec<u8>, Status> {
835        let request = Request::new(tokio_stream::iter(request));
836        let stream = self.decrypt_alpha1(request).await?;
837        let mut stream = stream.into_inner();
838        let mut data = vec![];
839        while let Some(resp) = stream.next().await {
840            if let Ok(resp) = resp {
841                if let Some(mut payload) = resp.payload {
842                    data.append(payload.data.as_mut())
843                }
844            }
845        }
846        Ok(data)
847    }
848
849    async fn schedule_job_alpha1(
850        &mut self,
851        request: ScheduleJobRequest,
852    ) -> Result<ScheduleJobResponse, Error> {
853        Ok(self.schedule_job_alpha1(request).await?.into_inner())
854    }
855
856    async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result<GetJobResponse, Error> {
857        Ok(self
858            .get_job_alpha1(Request::new(request))
859            .await?
860            .into_inner())
861    }
862
863    async fn delete_job_alpha1(
864        &mut self,
865        request: DeleteJobRequest,
866    ) -> Result<DeleteJobResponse, Error> {
867        Ok(self
868            .delete_job_alpha1(Request::new(request))
869            .await?
870            .into_inner())
871    }
872
873    async fn converse_alpha1(
874        &mut self,
875        request: ConversationRequest,
876    ) -> Result<ConversationResponse, Error> {
877        Ok(self
878            .converse_alpha1(Request::new(request))
879            .await?
880            .into_inner())
881    }
882}
883
884/// A request from invoking a service
885pub type InvokeServiceRequest = dapr_v1::InvokeServiceRequest;
886
887/// A response from invoking a service
888pub type InvokeServiceResponse = common_v1::InvokeResponse;
889
890/// A request from invoking a binding
891pub type InvokeBindingRequest = dapr_v1::InvokeBindingRequest;
892
893/// A reponse from invoking a binding
894pub type InvokeBindingResponse = dapr_v1::InvokeBindingResponse;
895
896/// A request for publishing event
897pub type PublishEventRequest = dapr_v1::PublishEventRequest;
898
899/// A request for getting state
900pub type GetStateRequest = dapr_v1::GetStateRequest;
901
902/// A response from getting state
903pub type GetStateResponse = dapr_v1::GetStateResponse;
904
905/// A request for saving state
906pub type SaveStateRequest = dapr_v1::SaveStateRequest;
907
908/// A state item
909pub type StateItem = common_v1::StateItem;
910
911/// State options
912pub type StateOptions = common_v1::StateOptions;
913
914/// Etag identifier
915pub type Etag = common_v1::Etag;
916
917/// A request for querying state
918pub type QueryStateRequest = dapr_v1::QueryStateRequest;
919
920/// A response from querying state
921pub type QueryStateResponse = dapr_v1::QueryStateResponse;
922
923/// A request for deleting state
924pub type DeleteStateRequest = dapr_v1::DeleteStateRequest;
925
926/// A request for deleting bulk state
927pub type DeleteBulkStateRequest = dapr_v1::DeleteBulkStateRequest;
928
929/// A request for getting secret
930pub type GetSecretRequest = dapr_v1::GetSecretRequest;
931
932/// A response from getting secret
933pub type GetSecretResponse = dapr_v1::GetSecretResponse;
934
935/// A request for getting bulk secrets
936pub type GetBulkSecretRequest = dapr_v1::GetBulkSecretRequest;
937
938/// A response for getting bulk secrets
939pub type GetBulkSecretResponse = dapr_v1::GetBulkSecretResponse;
940
941/// A response from getting metadata
942pub type GetMetadataResponse = dapr_v1::GetMetadataResponse;
943
944/// A request for getting metadata
945pub type GetMetadataRequest = dapr_v1::GetMetadataRequest;
946
947/// A request for setting metadata
948pub type SetMetadataRequest = dapr_v1::SetMetadataRequest;
949
950/// A request for invoking an actor
951pub type InvokeActorRequest = dapr_v1::InvokeActorRequest;
952
953/// A response from invoking an actor
954pub type InvokeActorResponse = dapr_v1::InvokeActorResponse;
955/// A request for getting configuration
956pub type GetConfigurationRequest = dapr_v1::GetConfigurationRequest;
957
958/// A response from getting configuration
959pub type GetConfigurationResponse = dapr_v1::GetConfigurationResponse;
960
961/// A request for subscribing to configuration changes
962pub type SubscribeConfigurationRequest = dapr_v1::SubscribeConfigurationRequest;
963
964/// A response from subscribing tto configuration changes
965pub type SubscribeConfigurationResponse = dapr_v1::SubscribeConfigurationResponse;
966
967/// A request for unsubscribing from configuration changes
968pub type UnsubscribeConfigurationRequest = dapr_v1::UnsubscribeConfigurationRequest;
969
970/// A response from unsubscribing from configuration changes
971pub type UnsubscribeConfigurationResponse = dapr_v1::UnsubscribeConfigurationResponse;
972
973/// A tonic based gRPC client
974pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;
975
976/// Encryption gRPC request
977pub type EncryptRequest = crate::dapr::proto::runtime::v1::EncryptRequest;
978
979/// Decrypt gRPC request
980pub type DecryptRequest = crate::dapr::proto::runtime::v1::DecryptRequest;
981
982/// Encryption request options
983pub type EncryptRequestOptions = crate::dapr::proto::runtime::v1::EncryptRequestOptions;
984
985/// Decryption request options
986pub type DecryptRequestOptions = crate::dapr::proto::runtime::v1::DecryptRequestOptions;
987
988/// The basic job structure
989pub type Job = crate::dapr::proto::runtime::v1::Job;
990
991/// A failure policy for a job
992pub type JobFailurePolicy = crate::dapr::proto::common::v1::JobFailurePolicy;
993
994/// A request to schedule a job
995pub type ScheduleJobRequest = crate::dapr::proto::runtime::v1::ScheduleJobRequest;
996
997/// A response from a schedule job request
998pub type ScheduleJobResponse = crate::dapr::proto::runtime::v1::ScheduleJobResponse;
999
1000/// A request to get a job
1001pub type GetJobRequest = crate::dapr::proto::runtime::v1::GetJobRequest;
1002
1003/// A response from a get job request
1004pub type GetJobResponse = crate::dapr::proto::runtime::v1::GetJobResponse;
1005
1006/// A request to delete a job
1007pub type DeleteJobRequest = crate::dapr::proto::runtime::v1::DeleteJobRequest;
1008
1009/// A response from a delete job request
1010pub type DeleteJobResponse = crate::dapr::proto::runtime::v1::DeleteJobResponse;
1011
1012/// A request to conversate with an LLM
1013pub type ConversationRequest = crate::dapr::proto::runtime::v1::ConversationRequest;
1014
1015/// A response from conversating with an LLM
1016pub type ConversationResponse = crate::dapr::proto::runtime::v1::ConversationResponse;
1017
1018/// A result from an interacting with a LLM
1019pub type ConversationResult = crate::dapr::proto::runtime::v1::ConversationResult;
1020
1021/// An input to the conversation
1022pub type ConversationInput = crate::dapr::proto::runtime::v1::ConversationInput;
1023
1024type StreamPayload = crate::dapr::proto::common::v1::StreamPayload;
1025impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
1026where
1027    K: Into<String>,
1028{
1029    fn from((key, value): (K, Vec<u8>)) -> Self {
1030        common_v1::StateItem {
1031            key: key.into(),
1032            value,
1033            ..Default::default()
1034        }
1035    }
1036}
1037
1038pub struct ReaderStream<T>(tokio_util::io::ReaderStream<T>);
1039
1040impl<T: AsyncRead> ReaderStream<T> {
1041    pub fn new(data: T) -> Self {
1042        ReaderStream(tokio_util::io::ReaderStream::new(data))
1043    }
1044}
1045
1046pub struct JobBuilder {
1047    schedule: Option<String>,
1048    data: Option<Any>,
1049    name: String,
1050    ttl: Option<String>,
1051    repeats: Option<u32>,
1052    due_time: Option<String>,
1053    failure_policy: Option<JobFailurePolicy>,
1054}
1055
1056impl JobBuilder {
1057    /// Create a new Job to be scheduled
1058    pub fn new(name: &str) -> Self {
1059        JobBuilder {
1060            schedule: None,
1061            data: None,
1062            name: name.to_string(),
1063            ttl: None,
1064            repeats: None,
1065            due_time: None,
1066            failure_policy: None,
1067        }
1068    }
1069
1070    pub fn with_schedule(mut self, schedule: &str) -> Self {
1071        self.schedule = Some(schedule.into());
1072        self
1073    }
1074
1075    pub fn with_data(mut self, data: Any) -> Self {
1076        self.data = Some(data);
1077        self
1078    }
1079
1080    pub fn with_ttl(mut self, ttl: &str) -> Self {
1081        self.ttl = Some(ttl.into());
1082        self
1083    }
1084
1085    pub fn with_repeats(mut self, repeats: u32) -> Self {
1086        self.repeats = Some(repeats);
1087        self
1088    }
1089
1090    pub fn with_due_time(mut self, due_time: &str) -> Self {
1091        self.due_time = Some(due_time.into());
1092        self
1093    }
1094
1095    pub fn with_failure_policy(mut self, policy: JobFailurePolicy) -> Self {
1096        self.failure_policy = Some(policy);
1097        self
1098    }
1099
1100    pub fn build(self) -> Job {
1101        Job {
1102            schedule: self.schedule,
1103            data: self.data,
1104            name: self.name,
1105            ttl: self.ttl,
1106            repeats: self.repeats,
1107            due_time: self.due_time,
1108            failure_policy: self.failure_policy,
1109        }
1110    }
1111}
1112
1113// Enum for a job failure policy
1114pub enum JobFailurePolicyType {
1115    Drop {},
1116    Constant {},
1117}
1118
1119pub struct JobFailurePolicyBuilder {
1120    policy: JobFailurePolicyType,
1121    pub retry_interval: Option<Duration>,
1122    pub max_retries: Option<u32>,
1123}
1124
1125impl JobFailurePolicyBuilder {
1126    pub fn new(policy: JobFailurePolicyType) -> Self {
1127        JobFailurePolicyBuilder {
1128            policy,
1129            retry_interval: None,
1130            max_retries: None,
1131        }
1132    }
1133
1134    pub fn with_retry_interval(mut self, interval: Duration) -> Self {
1135        // Convert interval string (e.g., "5s") to ProstDuration
1136        self.retry_interval = Some(interval);
1137        self
1138    }
1139
1140    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
1141        self.max_retries = Some(max_retries);
1142        self
1143    }
1144
1145    pub fn build(self) -> common_v1::JobFailurePolicy {
1146        match self.policy {
1147            JobFailurePolicyType::Drop {} => common_v1::JobFailurePolicy {
1148                policy: Some(Policy::Drop(Default::default())),
1149            },
1150            JobFailurePolicyType::Constant {} => JobFailurePolicy {
1151                policy: Some(Policy::Constant(JobFailurePolicyConstant {
1152                    interval: self.retry_interval.map(|interval| {
1153                        prost_types::Duration::try_from(interval)
1154                            .expect("Failed to convert Duration")
1155                    }),
1156                    max_retries: self.max_retries,
1157                })),
1158            },
1159        }
1160    }
1161}
1162
1163pub struct ConversationInputBuilder {
1164    content: String,
1165    role: Option<String>,
1166    scrub_pii: Option<bool>,
1167}
1168
1169impl ConversationInputBuilder {
1170    pub fn new(message: &str) -> Self {
1171        ConversationInputBuilder {
1172            content: message.to_string(),
1173            role: None,
1174            scrub_pii: None,
1175        }
1176    }
1177
1178    pub fn build(self) -> ConversationInput {
1179        ConversationInput {
1180            content: self.content,
1181            role: self.role,
1182            scrub_pii: self.scrub_pii,
1183        }
1184    }
1185}
1186
1187pub struct ConversationRequestBuilder {
1188    name: String,
1189    context_id: Option<String>,
1190    inputs: Vec<ConversationInput>,
1191    parameters: HashMap<String, Any>,
1192    metadata: HashMap<String, String>,
1193    scrub_pii: Option<bool>,
1194    temperature: Option<f64>,
1195}
1196impl ConversationRequestBuilder {
1197    pub fn new(name: &str, inputs: Vec<ConversationInput>) -> Self {
1198        ConversationRequestBuilder {
1199            name: name.to_string(),
1200            context_id: None,
1201            inputs,
1202            parameters: Default::default(),
1203            metadata: Default::default(),
1204            scrub_pii: None,
1205            temperature: None,
1206        }
1207    }
1208
1209    pub fn build(self) -> ConversationRequest {
1210        ConversationRequest {
1211            name: self.name,
1212            context_id: self.context_id,
1213            inputs: self.inputs,
1214            parameters: self.parameters,
1215            metadata: self.metadata,
1216            scrub_pii: self.scrub_pii,
1217            temperature: self.temperature,
1218        }
1219    }
1220}