Skip to main content

alien_bindings/providers/kv/
grpc.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{Context as _, IntoAlienError as _};
4use async_trait::async_trait;
5use std::fmt::{Debug, Formatter};
6use tonic::transport::Channel;
7
8// Import generated protobuf types
9pub mod proto {
10    tonic::include_proto!("alien_bindings.kv");
11}
12
13use proto::{
14    kv_service_client::KvServiceClient, DeleteRequest, ExistsRequest, GetRequest,
15    PutOptions as ProtoPutOptions, PutRequest, ScanPrefixRequest,
16};
17
18/// gRPC-based KV implementation that forwards calls to a remote KV service
19pub struct GrpcKv {
20    client: KvServiceClient<Channel>,
21    binding_name: String,
22}
23
24impl Debug for GrpcKv {
25    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26        f.debug_struct("GrpcKv")
27            .field("binding_name", &self.binding_name)
28            .finish()
29    }
30}
31
32impl GrpcKv {
33    /// Create a new gRPC KV client
34    pub async fn new(binding_name: String, grpc_endpoint: String) -> Result<Self> {
35        let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_endpoint).await?;
36        Self::new_from_channel(channel, binding_name).await
37    }
38
39    /// Create a new gRPC KV client from an existing channel
40    pub async fn new_from_channel(channel: Channel, binding_name: String) -> Result<Self> {
41        let client = KvServiceClient::new(channel);
42
43        Ok(Self {
44            client,
45            binding_name,
46        })
47    }
48
49    /// Convert PutOptions to proto PutOptions
50    fn put_options_to_proto(options: Option<PutOptions>) -> Option<ProtoPutOptions> {
51        options.map(|opts| ProtoPutOptions {
52            ttl_seconds: opts.ttl.map(|ttl| ttl.as_secs()),
53            if_not_exists: opts.if_not_exists,
54        })
55    }
56}
57
58impl Binding for GrpcKv {}
59
60#[async_trait]
61impl Kv for GrpcKv {
62    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
63        let mut client = self.client.clone();
64
65        let request = tonic::Request::new(GetRequest {
66            binding_name: self.binding_name.clone(),
67            key: key.to_string(),
68        });
69
70        let response =
71            client
72                .get(request)
73                .await
74                .into_alien_error()
75                .context(ErrorData::GrpcRequestFailed {
76                    service: "KvService".to_string(),
77                    method: "get".to_string(),
78                    details: format!("Failed to get key: {}", key),
79                })?;
80
81        Ok(response.into_inner().value)
82    }
83
84    async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
85        let mut client = self.client.clone();
86
87        let request = tonic::Request::new(PutRequest {
88            binding_name: self.binding_name.clone(),
89            key: key.to_string(),
90            value,
91            options: Self::put_options_to_proto(options),
92        });
93
94        let response =
95            client
96                .put(request)
97                .await
98                .into_alien_error()
99                .context(ErrorData::GrpcRequestFailed {
100                    service: "KvService".to_string(),
101                    method: "put".to_string(),
102                    details: format!("Failed to put key: {}", key),
103                })?;
104
105        Ok(response.into_inner().success)
106    }
107
108    async fn delete(&self, key: &str) -> Result<()> {
109        let mut client = self.client.clone();
110
111        let request = tonic::Request::new(DeleteRequest {
112            binding_name: self.binding_name.clone(),
113            key: key.to_string(),
114        });
115
116        client
117            .delete(request)
118            .await
119            .into_alien_error()
120            .context(ErrorData::GrpcRequestFailed {
121                service: "KvService".to_string(),
122                method: "delete".to_string(),
123                details: format!("Failed to delete key: {}", key),
124            })?;
125
126        Ok(())
127    }
128
129    async fn exists(&self, key: &str) -> Result<bool> {
130        let mut client = self.client.clone();
131
132        let request = tonic::Request::new(ExistsRequest {
133            binding_name: self.binding_name.clone(),
134            key: key.to_string(),
135        });
136
137        let response = client.exists(request).await.into_alien_error().context(
138            ErrorData::GrpcRequestFailed {
139                service: "KvService".to_string(),
140                method: "exists".to_string(),
141                details: format!("Failed to check exists for key: {}", key),
142            },
143        )?;
144
145        Ok(response.into_inner().exists)
146    }
147
148    async fn scan_prefix(
149        &self,
150        prefix: &str,
151        limit: Option<usize>,
152        cursor: Option<String>,
153    ) -> Result<ScanResult> {
154        let mut client = self.client.clone();
155
156        let request = tonic::Request::new(ScanPrefixRequest {
157            binding_name: self.binding_name.clone(),
158            prefix: prefix.to_string(),
159            limit: limit.map(|l| l as u32),
160            cursor,
161        });
162
163        let response = client
164            .scan_prefix(request)
165            .await
166            .into_alien_error()
167            .context(ErrorData::GrpcRequestFailed {
168                service: "KvService".to_string(),
169                method: "scan_prefix".to_string(),
170                details: format!("Failed to scan prefix: {}", prefix),
171            })?;
172
173        let response_inner = response.into_inner();
174        let items = response_inner
175            .items
176            .into_iter()
177            .map(|item| (item.key, item.value))
178            .collect();
179
180        Ok(ScanResult {
181            items,
182            next_cursor: response_inner.next_cursor,
183        })
184    }
185}