alien_bindings/providers/kv/
grpc.rs1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{Context as _, IntoAlienError as _};
4use async_trait::async_trait;
5use std::fmt::{Debug, Formatter};
6use tonic::transport::Channel;
7
8pub mod proto {
10 tonic::include_proto!("alien_bindings.kv");
11}
12
13use proto::{
14 kv_service_client::KvServiceClient, DeleteRequest, ExistsRequest, GetRequest,
15 PutOptions as ProtoPutOptions, PutRequest, ScanPrefixRequest,
16};
17
18pub struct GrpcKv {
20 client: KvServiceClient<Channel>,
21 binding_name: String,
22}
23
24impl Debug for GrpcKv {
25 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26 f.debug_struct("GrpcKv")
27 .field("binding_name", &self.binding_name)
28 .finish()
29 }
30}
31
32impl GrpcKv {
33 pub async fn new(binding_name: String, grpc_endpoint: String) -> Result<Self> {
35 let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_endpoint).await?;
36 Self::new_from_channel(channel, binding_name).await
37 }
38
39 pub async fn new_from_channel(channel: Channel, binding_name: String) -> Result<Self> {
41 let client = KvServiceClient::new(channel);
42
43 Ok(Self {
44 client,
45 binding_name,
46 })
47 }
48
49 fn put_options_to_proto(options: Option<PutOptions>) -> Option<ProtoPutOptions> {
51 options.map(|opts| ProtoPutOptions {
52 ttl_seconds: opts.ttl.map(|ttl| ttl.as_secs()),
53 if_not_exists: opts.if_not_exists,
54 })
55 }
56}
57
58impl Binding for GrpcKv {}
59
60#[async_trait]
61impl Kv for GrpcKv {
62 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
63 let mut client = self.client.clone();
64
65 let request = tonic::Request::new(GetRequest {
66 binding_name: self.binding_name.clone(),
67 key: key.to_string(),
68 });
69
70 let response =
71 client
72 .get(request)
73 .await
74 .into_alien_error()
75 .context(ErrorData::GrpcRequestFailed {
76 service: "KvService".to_string(),
77 method: "get".to_string(),
78 details: format!("Failed to get key: {}", key),
79 })?;
80
81 Ok(response.into_inner().value)
82 }
83
84 async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
85 let mut client = self.client.clone();
86
87 let request = tonic::Request::new(PutRequest {
88 binding_name: self.binding_name.clone(),
89 key: key.to_string(),
90 value,
91 options: Self::put_options_to_proto(options),
92 });
93
94 let response =
95 client
96 .put(request)
97 .await
98 .into_alien_error()
99 .context(ErrorData::GrpcRequestFailed {
100 service: "KvService".to_string(),
101 method: "put".to_string(),
102 details: format!("Failed to put key: {}", key),
103 })?;
104
105 Ok(response.into_inner().success)
106 }
107
108 async fn delete(&self, key: &str) -> Result<()> {
109 let mut client = self.client.clone();
110
111 let request = tonic::Request::new(DeleteRequest {
112 binding_name: self.binding_name.clone(),
113 key: key.to_string(),
114 });
115
116 client
117 .delete(request)
118 .await
119 .into_alien_error()
120 .context(ErrorData::GrpcRequestFailed {
121 service: "KvService".to_string(),
122 method: "delete".to_string(),
123 details: format!("Failed to delete key: {}", key),
124 })?;
125
126 Ok(())
127 }
128
129 async fn exists(&self, key: &str) -> Result<bool> {
130 let mut client = self.client.clone();
131
132 let request = tonic::Request::new(ExistsRequest {
133 binding_name: self.binding_name.clone(),
134 key: key.to_string(),
135 });
136
137 let response = client.exists(request).await.into_alien_error().context(
138 ErrorData::GrpcRequestFailed {
139 service: "KvService".to_string(),
140 method: "exists".to_string(),
141 details: format!("Failed to check exists for key: {}", key),
142 },
143 )?;
144
145 Ok(response.into_inner().exists)
146 }
147
148 async fn scan_prefix(
149 &self,
150 prefix: &str,
151 limit: Option<usize>,
152 cursor: Option<String>,
153 ) -> Result<ScanResult> {
154 let mut client = self.client.clone();
155
156 let request = tonic::Request::new(ScanPrefixRequest {
157 binding_name: self.binding_name.clone(),
158 prefix: prefix.to_string(),
159 limit: limit.map(|l| l as u32),
160 cursor,
161 });
162
163 let response = client
164 .scan_prefix(request)
165 .await
166 .into_alien_error()
167 .context(ErrorData::GrpcRequestFailed {
168 service: "KvService".to_string(),
169 method: "scan_prefix".to_string(),
170 details: format!("Failed to scan prefix: {}", prefix),
171 })?;
172
173 let response_inner = response.into_inner();
174 let items = response_inner
175 .items
176 .into_iter()
177 .map(|item| (item.key, item.value))
178 .collect();
179
180 Ok(ScanResult {
181 items,
182 next_cursor: response_inner.next_cursor,
183 })
184 }
185}