firebase_admin_sdk/firestore/
mod.rs1pub mod listen;
14pub mod models;
15pub mod query;
16pub mod reference;
17pub mod snapshot;
18pub mod transaction;
19pub mod batch;
20
21#[cfg(test)]
22mod tests;
23
24use self::batch::WriteBatch;
25use self::reference::{CollectionReference, DocumentReference};
26use self::transaction::Transaction;
27use crate::core::middleware::AuthMiddleware;
28use crate::core::parse_error_response;
29use crate::firestore::models::{
30 BeginTransactionRequest, BeginTransactionResponse, ListCollectionIdsRequest,
31 ListCollectionIdsResponse, RollbackRequest, TransactionOptions,
32};
33use reqwest::{header, Client};
34use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
35use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
36use std::future::Future;
37use thiserror::Error;
38
39const FIRESTORE_V1_API: &str =
40 "https://firestore.googleapis.com/v1/projects/{project_id}/databases/(default)/documents";
41
42#[derive(Error, Debug)]
44pub enum FirestoreError {
45 #[error("HTTP Request failed: {0}")]
47 RequestError(#[from] reqwest::Error),
48 #[error("Middleware error: {0}")]
50 MiddlewareError(#[from] reqwest_middleware::Error),
51 #[error("API error: {0}")]
53 ApiError(String),
54 #[error("Serialization error: {0}")]
56 SerializationError(#[from] serde_json::Error),
57 #[error("Transaction failed: {0}")]
59 TransactionError(String),
60}
61
62pub struct FirebaseFirestore {
64 client: ClientWithMiddleware,
65 base_url: String,
66}
67
68impl FirebaseFirestore {
69 pub fn new(middleware: AuthMiddleware) -> Self {
73 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
74
75 let client = ClientBuilder::new(Client::new())
76 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
77 .with(middleware.clone())
78 .build();
79
80 let project_id = middleware.key.project_id.clone().unwrap_or_default();
81 let base_url = FIRESTORE_V1_API.replace("{project_id}", &project_id);
82
83 Self { client, base_url }
84 }
85
86 pub fn new_with_url(middleware: AuthMiddleware, base_url: String) -> Self {
88 let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
89
90 let client = ClientBuilder::new(Client::new())
91 .with(RetryTransientMiddleware::new_with_policy(retry_policy))
92 .with(middleware.clone())
93 .build();
94
95 Self { client, base_url }
96 }
97
98 #[cfg(test)]
99 pub(crate) fn new_with_client(client: ClientWithMiddleware, base_url: String) -> Self {
100 Self { client, base_url }
101 }
102
103 pub fn collection<'a>(&'a self, collection_id: &str) -> CollectionReference<'a> {
109 CollectionReference {
110 client: &self.client,
111 path: format!("{}/{}", self.base_url, collection_id),
112 }
113 }
114
115 pub async fn list_collections(&self) -> Result<Vec<CollectionReference<'_>>, FirestoreError> {
117 let url = format!("{}:listCollectionIds", self.base_url);
118 let mut collections = Vec::new();
119 let mut next_page_token = None;
120
121 loop {
122 let request = ListCollectionIdsRequest {
123 page_size: Some(100),
124 page_token: next_page_token.take(),
125 };
126
127 let response = self
128 .client
129 .post(&url)
130 .header(header::CONTENT_TYPE, "application/json")
131 .body(serde_json::to_vec(&request)?)
132 .send()
133 .await?;
134
135 if !response.status().is_success() {
136 return Err(FirestoreError::ApiError(parse_error_response(response, "List collections failed").await));
137 }
138
139 let result: ListCollectionIdsResponse = response.json().await?;
140 for id in result.collection_ids {
141 collections.push(self.collection(&id));
142 }
143
144 if let Some(token) = result.next_page_token {
145 if token.is_empty() {
146 break;
147 }
148 next_page_token = Some(token);
149 } else {
150 break;
151 }
152 }
153
154 Ok(collections)
155 }
156
157 pub fn doc<'a>(&'a self, document_path: &str) -> DocumentReference<'a> {
163 DocumentReference {
164 client: &self.client,
165 path: format!("{}/{}", self.base_url, document_path),
166 }
167 }
168
169 pub fn batch(&self) -> WriteBatch<'_> {
171 WriteBatch::new(&self.client, self.base_url.clone())
172 }
173
174 pub async fn begin_transaction(
178 &self,
179 options: Option<TransactionOptions>,
180 ) -> Result<Transaction, FirestoreError> {
181 let url = format!(
182 "{}:beginTransaction",
183 self.base_url.split("/documents").next().unwrap()
184 );
185
186 let request = BeginTransactionRequest { options };
187
188 let response = self
189 .client
190 .post(&url)
191 .header(header::CONTENT_TYPE, "application/json")
192 .body(serde_json::to_vec(&request)?)
193 .send()
194 .await?;
195
196 if !response.status().is_success() {
197 return Err(FirestoreError::ApiError(parse_error_response(response, "Begin transaction failed").await));
198 }
199
200 let result: BeginTransactionResponse = response.json().await?;
201 Ok(Transaction::new(
202 self.client.clone(),
203 self.base_url.clone(),
204 result.transaction,
205 ))
206 }
207
208 pub async fn rollback(&self, transaction_id: &str) -> Result<(), FirestoreError> {
210 let url = format!(
211 "{}:rollback",
212 self.base_url.split("/documents").next().unwrap()
213 );
214
215 let request = RollbackRequest {
216 transaction: transaction_id.to_string(),
217 };
218
219 let response = self
220 .client
221 .post(&url)
222 .header(header::CONTENT_TYPE, "application/json")
223 .body(serde_json::to_vec(&request)?)
224 .send()
225 .await?;
226
227 if !response.status().is_success() {
228 return Err(FirestoreError::ApiError(parse_error_response(response, "Rollback transaction failed").await));
229 }
230
231 Ok(())
232 }
233
234 pub async fn run_transaction<F, Fut, R>(&self, update_fn: F) -> Result<R, FirestoreError>
242 where
243 F: Fn(Transaction) -> Fut,
244 Fut: Future<Output = Result<R, FirestoreError>>,
245 {
246 let mut retry_count = 0;
247 let max_retries = 5;
248
249 loop {
250 let transaction = self.begin_transaction(None).await?;
251 let transaction_id = transaction.transaction_id.clone();
252
253 let transaction_clone = transaction.clone();
255
256 match update_fn(transaction_clone).await {
257 Ok(result) => {
258 match transaction.commit().await {
259 Ok(_) => return Ok(result),
260 Err(FirestoreError::ApiError(msg)) if msg.contains("ABORTED") || msg.contains("status: 409") || msg.contains("Aborted") => {
261 retry_count += 1;
263 if retry_count >= max_retries {
264 return Err(FirestoreError::TransactionError("Max retries reached".into()));
265 }
266 continue;
268 }
269 Err(e) => return Err(e),
270 }
271 }
272 Err(e) => {
273 let _ = self.rollback(&transaction_id).await;
274 return Err(e);
275 }
276 }
277 }
278 }
279}