1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{AlienError, Context, IntoAlienError};
4use alien_gcp_clients::firestore::{
5 CollectionSelector, Direction, Document, FieldFilter, FieldFilterOperator, FieldReference,
6 Filter, FirestoreApi, FirestoreClient, Order, QueryType, RunQueryRequest, StructuredQuery,
7 Value,
8};
9use async_trait::async_trait;
10use base64::{self, Engine};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fmt::{Debug, Formatter};
15
16use super::{validate_key, validate_value};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20struct KvDocument {
21 value: String, created_at: DateTime<Utc>,
23 expires_at: Option<DateTime<Utc>>, }
25
26pub struct GcpFirestoreKv {
28 client: FirestoreClient,
29 project_id: String,
30 database_id: String,
31 collection_name: String,
32}
33
34impl Debug for GcpFirestoreKv {
35 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("GcpFirestoreKv")
37 .field("project_id", &self.project_id)
38 .field("database_id", &self.database_id)
39 .field("collection_name", &self.collection_name)
40 .finish()
41 }
42}
43
44impl GcpFirestoreKv {
45 pub fn new(
46 client: FirestoreClient,
47 project_id: String,
48 database_id: String,
49 collection_name: String,
50 ) -> Result<Self> {
51 Ok(Self {
52 client,
53 project_id,
54 database_id,
55 collection_name,
56 })
57 }
58
59 fn is_expired(&self, expires_at: Option<DateTime<Utc>>) -> bool {
61 if let Some(expiry) = expires_at {
62 Utc::now() >= expiry
63 } else {
64 false
65 }
66 }
67
68 fn kv_document_to_firestore(&self, _key: &str, kv_doc: &KvDocument) -> Document {
70 let mut fields = HashMap::new();
71
72 fields.insert(
73 "value".to_string(),
74 Value::StringValue(kv_doc.value.clone()),
75 );
76 fields.insert(
77 "created_at".to_string(),
78 Value::TimestampValue(kv_doc.created_at.to_rfc3339()),
79 );
80
81 if let Some(expires_at) = kv_doc.expires_at {
82 fields.insert(
83 "expires_at".to_string(),
84 Value::TimestampValue(expires_at.to_rfc3339()),
85 );
86 }
87
88 Document::builder().fields(fields).build()
89 }
90
91 fn kv_document_to_firestore_with_name(&self, key: &str, kv_doc: &KvDocument) -> Document {
93 let mut fields = HashMap::new();
94
95 fields.insert(
96 "value".to_string(),
97 Value::StringValue(kv_doc.value.clone()),
98 );
99 fields.insert(
100 "created_at".to_string(),
101 Value::TimestampValue(kv_doc.created_at.to_rfc3339()),
102 );
103
104 if let Some(expires_at) = kv_doc.expires_at {
105 fields.insert(
106 "expires_at".to_string(),
107 Value::TimestampValue(expires_at.to_rfc3339()),
108 );
109 }
110
111 Document::builder()
112 .name(format!(
113 "projects/{}/databases/{}/documents/{}/{}",
114 self.project_id, self.database_id, self.collection_name, key
115 ))
116 .fields(fields)
117 .build()
118 }
119
120 fn firestore_to_kv_document(&self, doc: &Document) -> Result<KvDocument> {
122 let fields = doc.fields.as_ref().ok_or_else(|| {
123 AlienError::new(ErrorData::UnexpectedResponseFormat {
124 provider: "gcp".to_string(),
125 binding_name: "firestore".to_string(),
126 field: "fields".to_string(),
127 response_json: serde_json::to_string(doc).unwrap_or_default(),
128 })
129 })?;
130
131 let value = match fields.get("value") {
132 Some(Value::StringValue(v)) => v.clone(),
133 _ => {
134 return Err(AlienError::new(ErrorData::UnexpectedResponseFormat {
135 provider: "gcp".to_string(),
136 binding_name: "firestore".to_string(),
137 field: "value".to_string(),
138 response_json: serde_json::to_string(doc).unwrap_or_default(),
139 }))
140 }
141 };
142
143 let created_at = match fields.get("created_at") {
144 Some(Value::TimestampValue(t)) => DateTime::parse_from_rfc3339(t)
145 .map_err(|_| {
146 AlienError::new(ErrorData::UnexpectedResponseFormat {
147 provider: "gcp".to_string(),
148 binding_name: "firestore".to_string(),
149 field: "created_at".to_string(),
150 response_json: serde_json::to_string(doc).unwrap_or_default(),
151 })
152 })?
153 .with_timezone(&Utc),
154 _ => {
155 return Err(AlienError::new(ErrorData::UnexpectedResponseFormat {
156 provider: "gcp".to_string(),
157 binding_name: "firestore".to_string(),
158 field: "created_at".to_string(),
159 response_json: serde_json::to_string(doc).unwrap_or_default(),
160 }))
161 }
162 };
163
164 let expires_at = match fields.get("expires_at") {
165 Some(Value::TimestampValue(t)) => Some(
166 DateTime::parse_from_rfc3339(t)
167 .map_err(|_| {
168 AlienError::new(ErrorData::UnexpectedResponseFormat {
169 provider: "gcp".to_string(),
170 binding_name: "firestore".to_string(),
171 field: "expires_at".to_string(),
172 response_json: serde_json::to_string(doc).unwrap_or_default(),
173 })
174 })?
175 .with_timezone(&Utc),
176 ),
177 _ => None,
178 };
179
180 Ok(KvDocument {
181 value,
182 created_at,
183 expires_at,
184 })
185 }
186}
187
188impl Binding for GcpFirestoreKv {}
189
190#[async_trait]
191impl Kv for GcpFirestoreKv {
192 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
193 validate_key(key)?;
194
195 let document_id = key;
196 let document_path = format!("{}/{}", self.collection_name, document_id);
197
198 match self
199 .client
200 .get_document(self.database_id.clone(), document_path, None, None, None)
201 .await
202 {
203 Ok(doc) => {
204 let kv_doc = self.firestore_to_kv_document(&doc)?;
205
206 if self.is_expired(kv_doc.expires_at) {
208 return Ok(None); }
210
211 let value = base64::engine::general_purpose::STANDARD
212 .decode(&kv_doc.value)
213 .into_alien_error()
214 .context(ErrorData::KvOperationFailed {
215 operation: "get".to_string(),
216 key: key.to_string(),
217 reason: "Failed to decode base64 value".to_string(),
218 })?;
219
220 Ok(Some(value))
221 }
222 Err(e) => {
223 match &e.error {
225 Some(alien_client_core::ErrorData::RemoteResourceNotFound { .. }) => {
226 Ok(None) }
228 _ => Err(crate::error::map_cloud_client_error(
229 e,
230 "Failed to get Firestore document".to_string(),
231 Some(key.to_string()),
232 )),
233 }
234 }
235 }
236 }
237
238 async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
239 validate_key(key)?;
240 validate_value(&value)?;
241
242 let options = options.unwrap_or_default();
243
244 let encoded_value = base64::engine::general_purpose::STANDARD.encode(&value);
245 let kv_doc = KvDocument {
246 value: encoded_value,
247 created_at: Utc::now(),
248 expires_at: options.ttl.map(|d| Utc::now() + d),
249 };
250
251 let document = self.kv_document_to_firestore(key, &kv_doc);
252
253 if options.if_not_exists {
254 let document_id = key.to_string();
255 match self
256 .client
257 .create_document(
258 self.database_id.clone(),
259 self.collection_name.clone(),
260 Some(document_id),
261 document,
262 None,
263 )
264 .await
265 {
266 Ok(_) => Ok(true),
267 Err(e) => {
268 match &e.error {
270 Some(alien_client_core::ErrorData::RemoteResourceConflict { .. }) => {
271 Ok(false)
272 }
273 _ => Err(crate::error::map_cloud_client_error(
274 e,
275 "Failed to create Firestore document".to_string(),
276 Some(key.to_string()),
277 )),
278 }
279 }
280 }
281 } else {
282 let document_id = key;
283 let document_path = format!("{}/{}", self.collection_name, document_id);
284 let document_with_name = self.kv_document_to_firestore_with_name(key, &kv_doc);
285
286 self.client
287 .patch_document(
288 self.database_id.clone(),
289 document_path,
290 document_with_name,
291 None,
292 None,
293 None,
294 )
295 .await
296 .map_err(|e| {
297 crate::error::map_cloud_client_error(
298 e,
299 "Failed to patch Firestore document".to_string(),
300 Some(key.to_string()),
301 )
302 })?;
303
304 Ok(true)
305 }
306 }
307
308 async fn delete(&self, key: &str) -> Result<()> {
309 validate_key(key)?;
310
311 let document_id = key;
312 let document_path = format!("{}/{}", self.collection_name, document_id);
313
314 self.client
315 .delete_document(self.database_id.clone(), document_path, None)
316 .await
317 .map_err(|e| {
318 crate::error::map_cloud_client_error(
319 e,
320 "Failed to delete Firestore document".to_string(),
321 Some(key.to_string()),
322 )
323 })?;
324
325 Ok(())
326 }
327
328 async fn exists(&self, key: &str) -> Result<bool> {
329 validate_key(key)?;
330
331 let document_id = key;
332 let document_path = format!("{}/{}", self.collection_name, document_id);
333
334 match self
335 .client
336 .get_document(self.database_id.clone(), document_path, None, None, None)
337 .await
338 {
339 Ok(doc) => {
340 let kv_doc = self.firestore_to_kv_document(&doc)?;
341
342 Ok(!self.is_expired(kv_doc.expires_at))
344 }
345 Err(e) => {
346 match &e.error {
347 Some(alien_client_core::ErrorData::RemoteResourceNotFound { .. }) => {
348 Ok(false) }
350 _ => Err(crate::error::map_cloud_client_error(
351 e,
352 "Failed to get Firestore document".to_string(),
353 Some(key.to_string()),
354 )),
355 }
356 }
357 }
358 }
359
360 async fn scan_prefix(
361 &self,
362 prefix: &str,
363 limit: Option<usize>,
364 cursor: Option<String>,
365 ) -> Result<ScanResult> {
366 validate_key(prefix)?; let collection_selector = CollectionSelector::builder()
369 .collection_id(self.collection_name.clone())
370 .build();
371
372 let mut structured_query = StructuredQuery::builder()
373 .from(vec![collection_selector])
374 .order_by(vec![Order::builder()
375 .field(
376 FieldReference::builder()
377 .field_path("__name__".to_string())
378 .build(),
379 )
380 .direction(Direction::Ascending)
381 .build()])
382 .build();
383
384 if !prefix.is_empty() {
386 let document_id_prefix = prefix;
387 let prefix_filter = Filter::FieldFilter(
388 FieldFilter::builder()
389 .field(
390 FieldReference::builder()
391 .field_path("__name__".to_string())
392 .build(),
393 )
394 .op(FieldFilterOperator::GreaterThanOrEqual)
395 .value(Value::ReferenceValue(format!(
396 "projects/{}/databases/{}/documents/{}/{}",
397 self.project_id, self.database_id, self.collection_name, document_id_prefix
398 )))
399 .build(),
400 );
401
402 structured_query.r#where = Some(prefix_filter);
403 }
404
405 if let Some(limit) = limit {
406 structured_query.limit = Some(limit as i32);
407 }
408
409 if let Some(ref cursor) = cursor {
410 if let Ok(offset) = cursor.parse::<i32>() {
412 structured_query.offset = Some(offset);
413 }
414 }
415
416 let query_request = RunQueryRequest::builder()
417 .parent(format!(
418 "projects/{}/databases/{}/documents",
419 self.project_id, self.database_id
420 ))
421 .query_type(QueryType::StructuredQuery(structured_query))
422 .build();
423
424 let query_responses = self
425 .client
426 .run_query(self.database_id.clone(), query_request)
427 .await
428 .map_err(|e| {
429 crate::error::map_cloud_client_error(
430 e,
431 "Failed to run Firestore query".to_string(),
432 Some(prefix.to_string()),
433 )
434 })?;
435
436 let items: Vec<(String, Vec<u8>)> = query_responses
437 .iter()
438 .filter_map(|response| {
439 let doc = response.document.as_ref()?;
440 let doc_name = doc.name.as_ref()?;
441
442 let document_id = doc_name.split('/').last()?.to_string();
444
445 let key = document_id;
447
448 if !key.starts_with(prefix) {
450 return None;
451 }
452
453 let kv_doc = self.firestore_to_kv_document(doc).ok()?;
454
455 if self.is_expired(kv_doc.expires_at) {
457 return None; }
459
460 let value = base64::engine::general_purpose::STANDARD
461 .decode(&kv_doc.value)
462 .ok()?;
463 Some((key, value))
464 })
465 .collect();
466
467 let next_cursor = if items.len() == limit.unwrap_or(usize::MAX) {
468 let current_offset = cursor
470 .as_ref()
471 .and_then(|c| c.parse::<usize>().ok())
472 .unwrap_or(0);
473 Some((current_offset + items.len()).to_string())
474 } else {
475 None
476 };
477
478 Ok(ScanResult { items, next_cursor })
479 }
480}