Skip to main content

alien_bindings/providers/kv/
gcp_firestore.rs

1use crate::error::{ErrorData, Result};
2use crate::traits::{Binding, Kv, PutOptions, ScanResult};
3use alien_error::{AlienError, Context, IntoAlienError};
4use alien_gcp_clients::firestore::{
5    CollectionSelector, Direction, Document, FieldFilter, FieldFilterOperator, FieldReference,
6    Filter, FirestoreApi, FirestoreClient, Order, QueryType, RunQueryRequest, StructuredQuery,
7    Value,
8};
9use async_trait::async_trait;
10use base64::{self, Engine};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::fmt::{Debug, Formatter};
15
16use super::{validate_key, validate_value};
17
18/// Firestore document for KV storage
19#[derive(Debug, Clone, Serialize, Deserialize)]
20struct KvDocument {
21    value: String, // Base64-encoded binary data
22    created_at: DateTime<Utc>,
23    expires_at: Option<DateTime<Utc>>, // For TTL policy
24}
25
26/// GCP Firestore implementation of the KV trait
27pub struct GcpFirestoreKv {
28    client: FirestoreClient,
29    project_id: String,
30    database_id: String,
31    collection_name: String,
32}
33
34impl Debug for GcpFirestoreKv {
35    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("GcpFirestoreKv")
37            .field("project_id", &self.project_id)
38            .field("database_id", &self.database_id)
39            .field("collection_name", &self.collection_name)
40            .finish()
41    }
42}
43
44impl GcpFirestoreKv {
45    pub fn new(
46        client: FirestoreClient,
47        project_id: String,
48        database_id: String,
49        collection_name: String,
50    ) -> Result<Self> {
51        Ok(Self {
52            client,
53            project_id,
54            database_id,
55            collection_name,
56        })
57    }
58
59    /// Checks if an item has expired based on TTL
60    fn is_expired(&self, expires_at: Option<DateTime<Utc>>) -> bool {
61        if let Some(expiry) = expires_at {
62            Utc::now() >= expiry
63        } else {
64            false
65        }
66    }
67
68    /// Converts a KV document to Firestore Document format
69    fn kv_document_to_firestore(&self, _key: &str, kv_doc: &KvDocument) -> Document {
70        let mut fields = HashMap::new();
71
72        fields.insert(
73            "value".to_string(),
74            Value::StringValue(kv_doc.value.clone()),
75        );
76        fields.insert(
77            "created_at".to_string(),
78            Value::TimestampValue(kv_doc.created_at.to_rfc3339()),
79        );
80
81        if let Some(expires_at) = kv_doc.expires_at {
82            fields.insert(
83                "expires_at".to_string(),
84                Value::TimestampValue(expires_at.to_rfc3339()),
85            );
86        }
87
88        Document::builder().fields(fields).build()
89    }
90
91    /// Converts a KV document to Firestore Document format with name (for updates)
92    fn kv_document_to_firestore_with_name(&self, key: &str, kv_doc: &KvDocument) -> Document {
93        let mut fields = HashMap::new();
94
95        fields.insert(
96            "value".to_string(),
97            Value::StringValue(kv_doc.value.clone()),
98        );
99        fields.insert(
100            "created_at".to_string(),
101            Value::TimestampValue(kv_doc.created_at.to_rfc3339()),
102        );
103
104        if let Some(expires_at) = kv_doc.expires_at {
105            fields.insert(
106                "expires_at".to_string(),
107                Value::TimestampValue(expires_at.to_rfc3339()),
108            );
109        }
110
111        Document::builder()
112            .name(format!(
113                "projects/{}/databases/{}/documents/{}/{}",
114                self.project_id, self.database_id, self.collection_name, key
115            ))
116            .fields(fields)
117            .build()
118    }
119
120    /// Converts a Firestore Document to KV document
121    fn firestore_to_kv_document(&self, doc: &Document) -> Result<KvDocument> {
122        let fields = doc.fields.as_ref().ok_or_else(|| {
123            AlienError::new(ErrorData::UnexpectedResponseFormat {
124                provider: "gcp".to_string(),
125                binding_name: "firestore".to_string(),
126                field: "fields".to_string(),
127                response_json: serde_json::to_string(doc).unwrap_or_default(),
128            })
129        })?;
130
131        let value = match fields.get("value") {
132            Some(Value::StringValue(v)) => v.clone(),
133            _ => {
134                return Err(AlienError::new(ErrorData::UnexpectedResponseFormat {
135                    provider: "gcp".to_string(),
136                    binding_name: "firestore".to_string(),
137                    field: "value".to_string(),
138                    response_json: serde_json::to_string(doc).unwrap_or_default(),
139                }))
140            }
141        };
142
143        let created_at = match fields.get("created_at") {
144            Some(Value::TimestampValue(t)) => DateTime::parse_from_rfc3339(t)
145                .map_err(|_| {
146                    AlienError::new(ErrorData::UnexpectedResponseFormat {
147                        provider: "gcp".to_string(),
148                        binding_name: "firestore".to_string(),
149                        field: "created_at".to_string(),
150                        response_json: serde_json::to_string(doc).unwrap_or_default(),
151                    })
152                })?
153                .with_timezone(&Utc),
154            _ => {
155                return Err(AlienError::new(ErrorData::UnexpectedResponseFormat {
156                    provider: "gcp".to_string(),
157                    binding_name: "firestore".to_string(),
158                    field: "created_at".to_string(),
159                    response_json: serde_json::to_string(doc).unwrap_or_default(),
160                }))
161            }
162        };
163
164        let expires_at = match fields.get("expires_at") {
165            Some(Value::TimestampValue(t)) => Some(
166                DateTime::parse_from_rfc3339(t)
167                    .map_err(|_| {
168                        AlienError::new(ErrorData::UnexpectedResponseFormat {
169                            provider: "gcp".to_string(),
170                            binding_name: "firestore".to_string(),
171                            field: "expires_at".to_string(),
172                            response_json: serde_json::to_string(doc).unwrap_or_default(),
173                        })
174                    })?
175                    .with_timezone(&Utc),
176            ),
177            _ => None,
178        };
179
180        Ok(KvDocument {
181            value,
182            created_at,
183            expires_at,
184        })
185    }
186}
187
188impl Binding for GcpFirestoreKv {}
189
190#[async_trait]
191impl Kv for GcpFirestoreKv {
192    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
193        validate_key(key)?;
194
195        let document_id = key;
196        let document_path = format!("{}/{}", self.collection_name, document_id);
197
198        match self
199            .client
200            .get_document(self.database_id.clone(), document_path, None, None, None)
201            .await
202        {
203            Ok(doc) => {
204                let kv_doc = self.firestore_to_kv_document(&doc)?;
205
206                // Check TTL expiry (logical expiry contract)
207                if self.is_expired(kv_doc.expires_at) {
208                    return Ok(None); // Logically expired
209                }
210
211                let value = base64::engine::general_purpose::STANDARD
212                    .decode(&kv_doc.value)
213                    .into_alien_error()
214                    .context(ErrorData::KvOperationFailed {
215                        operation: "get".to_string(),
216                        key: key.to_string(),
217                        reason: "Failed to decode base64 value".to_string(),
218                    })?;
219
220                Ok(Some(value))
221            }
222            Err(e) => {
223                // Check if this is a "not found" error
224                match &e.error {
225                    Some(alien_client_core::ErrorData::RemoteResourceNotFound { .. }) => {
226                        Ok(None) // Document doesn't exist
227                    }
228                    _ => Err(crate::error::map_cloud_client_error(
229                        e,
230                        "Failed to get Firestore document".to_string(),
231                        Some(key.to_string()),
232                    )),
233                }
234            }
235        }
236    }
237
238    async fn put(&self, key: &str, value: Vec<u8>, options: Option<PutOptions>) -> Result<bool> {
239        validate_key(key)?;
240        validate_value(&value)?;
241
242        let options = options.unwrap_or_default();
243
244        let encoded_value = base64::engine::general_purpose::STANDARD.encode(&value);
245        let kv_doc = KvDocument {
246            value: encoded_value,
247            created_at: Utc::now(),
248            expires_at: options.ttl.map(|d| Utc::now() + d),
249        };
250
251        let document = self.kv_document_to_firestore(key, &kv_doc);
252
253        if options.if_not_exists {
254            let document_id = key.to_string();
255            match self
256                .client
257                .create_document(
258                    self.database_id.clone(),
259                    self.collection_name.clone(),
260                    Some(document_id),
261                    document,
262                    None,
263                )
264                .await
265            {
266                Ok(_) => Ok(true),
267                Err(e) => {
268                    // Check if this is a conflict (document already exists)
269                    match &e.error {
270                        Some(alien_client_core::ErrorData::RemoteResourceConflict { .. }) => {
271                            Ok(false)
272                        }
273                        _ => Err(crate::error::map_cloud_client_error(
274                            e,
275                            "Failed to create Firestore document".to_string(),
276                            Some(key.to_string()),
277                        )),
278                    }
279                }
280            }
281        } else {
282            let document_id = key;
283            let document_path = format!("{}/{}", self.collection_name, document_id);
284            let document_with_name = self.kv_document_to_firestore_with_name(key, &kv_doc);
285
286            self.client
287                .patch_document(
288                    self.database_id.clone(),
289                    document_path,
290                    document_with_name,
291                    None,
292                    None,
293                    None,
294                )
295                .await
296                .map_err(|e| {
297                    crate::error::map_cloud_client_error(
298                        e,
299                        "Failed to patch Firestore document".to_string(),
300                        Some(key.to_string()),
301                    )
302                })?;
303
304            Ok(true)
305        }
306    }
307
308    async fn delete(&self, key: &str) -> Result<()> {
309        validate_key(key)?;
310
311        let document_id = key;
312        let document_path = format!("{}/{}", self.collection_name, document_id);
313
314        self.client
315            .delete_document(self.database_id.clone(), document_path, None)
316            .await
317            .map_err(|e| {
318                crate::error::map_cloud_client_error(
319                    e,
320                    "Failed to delete Firestore document".to_string(),
321                    Some(key.to_string()),
322                )
323            })?;
324
325        Ok(())
326    }
327
328    async fn exists(&self, key: &str) -> Result<bool> {
329        validate_key(key)?;
330
331        let document_id = key;
332        let document_path = format!("{}/{}", self.collection_name, document_id);
333
334        match self
335            .client
336            .get_document(self.database_id.clone(), document_path, None, None, None)
337            .await
338        {
339            Ok(doc) => {
340                let kv_doc = self.firestore_to_kv_document(&doc)?;
341
342                // Check TTL expiry (logical expiry contract)
343                Ok(!self.is_expired(kv_doc.expires_at))
344            }
345            Err(e) => {
346                match &e.error {
347                    Some(alien_client_core::ErrorData::RemoteResourceNotFound { .. }) => {
348                        Ok(false) // Document doesn't exist
349                    }
350                    _ => Err(crate::error::map_cloud_client_error(
351                        e,
352                        "Failed to get Firestore document".to_string(),
353                        Some(key.to_string()),
354                    )),
355                }
356            }
357        }
358    }
359
360    async fn scan_prefix(
361        &self,
362        prefix: &str,
363        limit: Option<usize>,
364        cursor: Option<String>,
365    ) -> Result<ScanResult> {
366        validate_key(prefix)?; // Prefix follows same key validation rules
367
368        let collection_selector = CollectionSelector::builder()
369            .collection_id(self.collection_name.clone())
370            .build();
371
372        let mut structured_query = StructuredQuery::builder()
373            .from(vec![collection_selector])
374            .order_by(vec![Order::builder()
375                .field(
376                    FieldReference::builder()
377                        .field_path("__name__".to_string())
378                        .build(),
379                )
380                .direction(Direction::Ascending)
381                .build()])
382            .build();
383
384        // Add prefix filter
385        if !prefix.is_empty() {
386            let document_id_prefix = prefix;
387            let prefix_filter = Filter::FieldFilter(
388                FieldFilter::builder()
389                    .field(
390                        FieldReference::builder()
391                            .field_path("__name__".to_string())
392                            .build(),
393                    )
394                    .op(FieldFilterOperator::GreaterThanOrEqual)
395                    .value(Value::ReferenceValue(format!(
396                        "projects/{}/databases/{}/documents/{}/{}",
397                        self.project_id, self.database_id, self.collection_name, document_id_prefix
398                    )))
399                    .build(),
400            );
401
402            structured_query.r#where = Some(prefix_filter);
403        }
404
405        if let Some(limit) = limit {
406            structured_query.limit = Some(limit as i32);
407        }
408
409        if let Some(ref cursor) = cursor {
410            // For simplicity, use offset-based pagination
411            if let Ok(offset) = cursor.parse::<i32>() {
412                structured_query.offset = Some(offset);
413            }
414        }
415
416        let query_request = RunQueryRequest::builder()
417            .parent(format!(
418                "projects/{}/databases/{}/documents",
419                self.project_id, self.database_id
420            ))
421            .query_type(QueryType::StructuredQuery(structured_query))
422            .build();
423
424        let query_responses = self
425            .client
426            .run_query(self.database_id.clone(), query_request)
427            .await
428            .map_err(|e| {
429                crate::error::map_cloud_client_error(
430                    e,
431                    "Failed to run Firestore query".to_string(),
432                    Some(prefix.to_string()),
433                )
434            })?;
435
436        let items: Vec<(String, Vec<u8>)> = query_responses
437            .iter()
438            .filter_map(|response| {
439                let doc = response.document.as_ref()?;
440                let doc_name = doc.name.as_ref()?;
441
442                // Extract document ID from document name
443                let document_id = doc_name.split('/').last()?.to_string();
444
445                // Document ID is now the key directly (no encoding needed)
446                let key = document_id;
447
448                // Check if key starts with prefix
449                if !key.starts_with(prefix) {
450                    return None;
451                }
452
453                let kv_doc = self.firestore_to_kv_document(doc).ok()?;
454
455                // Check TTL expiry
456                if self.is_expired(kv_doc.expires_at) {
457                    return None; // Skip expired items
458                }
459
460                let value = base64::engine::general_purpose::STANDARD
461                    .decode(&kv_doc.value)
462                    .ok()?;
463                Some((key, value))
464            })
465            .collect();
466
467        let next_cursor = if items.len() == limit.unwrap_or(usize::MAX) {
468            // Simple offset-based pagination
469            let current_offset = cursor
470                .as_ref()
471                .and_then(|c| c.parse::<usize>().ok())
472                .unwrap_or(0);
473            Some((current_offset + items.len()).to_string())
474        } else {
475            None
476        };
477
478        Ok(ScanResult { items, next_cursor })
479    }
480}