Skip to main content

rouchdb_adapter_http/
lib.rs

1/// HTTP adapter for RouchDB.
2///
3/// Communicates with a remote CouchDB-compatible server via HTTP,
4/// implementing the Adapter trait by mapping each method to the
5/// corresponding CouchDB REST API endpoint.
6
7use std::collections::HashMap;
8
9use async_trait::async_trait;
10use reqwest::Client;
11use serde::{Deserialize, Serialize};
12
13use rouchdb_core::adapter::Adapter;
14use rouchdb_core::document::*;
15use rouchdb_core::error::{Result, RouchError};
16
17// ---------------------------------------------------------------------------
18// CouchDB JSON response shapes
19// ---------------------------------------------------------------------------
20
21#[derive(Debug, Deserialize)]
22struct CouchDbInfo {
23    db_name: String,
24    doc_count: u64,
25    update_seq: serde_json::Value, // Can be integer or string depending on CouchDB version
26}
27
28#[derive(Debug, Deserialize)]
29struct CouchDbPutResponse {
30    ok: Option<bool>,
31    id: String,
32    rev: String,
33}
34
35#[derive(Debug, Deserialize)]
36struct CouchDbError {
37    #[allow(dead_code)]
38    error: String,
39    reason: String,
40}
41
42#[derive(Debug, Serialize)]
43struct CouchDbBulkDocsRequest {
44    docs: Vec<serde_json::Value>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    new_edits: Option<bool>,
47}
48
49#[derive(Debug, Deserialize)]
50struct CouchDbBulkDocsResult {
51    ok: Option<bool>,
52    id: Option<String>,
53    rev: Option<String>,
54    error: Option<String>,
55    reason: Option<String>,
56}
57
58#[derive(Debug, Serialize)]
59struct CouchDbBulkGetRequest {
60    docs: Vec<CouchDbBulkGetDoc>,
61}
62
63#[derive(Debug, Serialize)]
64struct CouchDbBulkGetDoc {
65    id: String,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    rev: Option<String>,
68}
69
70#[derive(Debug, Deserialize)]
71struct CouchDbBulkGetResponse {
72    results: Vec<CouchDbBulkGetResult>,
73}
74
75#[derive(Debug, Deserialize)]
76struct CouchDbBulkGetResult {
77    id: String,
78    docs: Vec<CouchDbBulkGetDocResult>,
79}
80
81#[derive(Debug, Deserialize)]
82struct CouchDbBulkGetDocResult {
83    ok: Option<serde_json::Value>,
84    error: Option<CouchDbBulkGetErrorResult>,
85}
86
87#[derive(Debug, Deserialize)]
88struct CouchDbBulkGetErrorResult {
89    id: String,
90    rev: String,
91    error: String,
92    reason: String,
93}
94
95#[derive(Debug, Deserialize)]
96struct CouchDbChangesResponse {
97    results: Vec<CouchDbChangeResult>,
98    last_seq: serde_json::Value,
99}
100
101#[derive(Debug, Deserialize)]
102struct CouchDbChangeResult {
103    seq: serde_json::Value,
104    id: String,
105    changes: Vec<CouchDbChangeRev>,
106    #[serde(default)]
107    deleted: bool,
108    doc: Option<serde_json::Value>,
109}
110
111#[derive(Debug, Deserialize)]
112struct CouchDbChangeRev {
113    rev: String,
114}
115
116#[derive(Debug, Deserialize)]
117struct CouchDbAllDocsResponse {
118    total_rows: u64,
119    offset: u64,
120    rows: Vec<CouchDbAllDocsRow>,
121}
122
123#[derive(Debug, Deserialize)]
124struct CouchDbAllDocsRow {
125    id: String,
126    key: String,
127    value: CouchDbAllDocsRowValue,
128    doc: Option<serde_json::Value>,
129}
130
131#[derive(Debug, Deserialize)]
132struct CouchDbAllDocsRowValue {
133    rev: String,
134    #[serde(default)]
135    deleted: Option<bool>,
136}
137
138// ---------------------------------------------------------------------------
139// HttpAdapter
140// ---------------------------------------------------------------------------
141
142/// HTTP adapter that talks to a remote CouchDB instance.
143pub struct HttpAdapter {
144    client: Client,
145    base_url: String,
146}
147
148impl HttpAdapter {
149    /// Create a new HTTP adapter pointing at a CouchDB database URL.
150    ///
151    /// The URL should include the database name, e.g.
152    /// `http://localhost:5984/mydb` or `http://admin:password@localhost:5984/mydb`
153    pub fn new(url: &str) -> Self {
154        let base_url = url.trim_end_matches('/').to_string();
155        Self {
156            client: Client::new(),
157            base_url,
158        }
159    }
160
161    /// Create a new HTTP adapter with a custom reqwest client.
162    pub fn with_client(url: &str, client: Client) -> Self {
163        let base_url = url.trim_end_matches('/').to_string();
164        Self { client, base_url }
165    }
166
167    fn url(&self, path: &str) -> String {
168        format!("{}/{}", self.base_url, path.trim_start_matches('/'))
169    }
170
171    async fn check_error(&self, response: reqwest::Response) -> Result<reqwest::Response> {
172        let status = response.status();
173        if status.is_success() {
174            return Ok(response);
175        }
176
177        match status.as_u16() {
178            401 => Err(RouchError::Unauthorized),
179            403 => {
180                let body: CouchDbError = response.json().await
181                    .unwrap_or(CouchDbError { error: "forbidden".into(), reason: "access denied".into() });
182                Err(RouchError::Forbidden(body.reason))
183            }
184            404 => {
185                let body: CouchDbError = response.json().await
186                    .unwrap_or(CouchDbError { error: "not_found".into(), reason: "missing".into() });
187                Err(RouchError::NotFound(body.reason))
188            }
189            409 => Err(RouchError::Conflict),
190            _ => {
191                let body = response.text().await.unwrap_or_default();
192                Err(RouchError::DatabaseError(format!("HTTP {}: {}", status, body)))
193            }
194        }
195    }
196}
197
198/// Parse a CouchDB sequence value (can be integer or string).
199fn parse_seq(value: &serde_json::Value) -> Seq {
200    match value {
201        serde_json::Value::Number(n) => Seq::Num(n.as_u64().unwrap_or(0)),
202        serde_json::Value::String(s) => {
203            if let Ok(n) = s.parse::<u64>() {
204                Seq::Num(n)
205            } else {
206                Seq::Str(s.clone())
207            }
208        }
209        _ => Seq::Num(0),
210    }
211}
212
213#[async_trait]
214impl Adapter for HttpAdapter {
215    async fn info(&self) -> Result<DbInfo> {
216        let resp = self.client.get(&self.base_url).send().await
217            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
218        let resp = self.check_error(resp).await?;
219        let info: CouchDbInfo = resp.json().await
220            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
221
222        Ok(DbInfo {
223            db_name: info.db_name,
224            doc_count: info.doc_count,
225            update_seq: parse_seq(&info.update_seq),
226        })
227    }
228
229    async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
230        let mut url = self.url(&urlencoded(id));
231        let mut params = Vec::new();
232
233        if let Some(ref rev) = opts.rev {
234            params.push(format!("rev={}", rev));
235        }
236        if opts.conflicts {
237            params.push("conflicts=true".into());
238        }
239        if opts.revs {
240            params.push("revs=true".into());
241        }
242        if let Some(ref open_revs) = opts.open_revs {
243            match open_revs {
244                OpenRevs::All => params.push("open_revs=all".into()),
245                OpenRevs::Specific(revs) => {
246                    let json = serde_json::to_string(revs).unwrap_or_default();
247                    params.push(format!("open_revs={}", json));
248                }
249            }
250        }
251
252        if !params.is_empty() {
253            url = format!("{}?{}", url, params.join("&"));
254        }
255
256        let resp = self.client.get(&url).send().await
257            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
258        let resp = self.check_error(resp).await?;
259        let json: serde_json::Value = resp.json().await
260            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
261
262        Document::from_json(json)
263    }
264
265    async fn bulk_docs(
266        &self,
267        docs: Vec<Document>,
268        opts: BulkDocsOptions,
269    ) -> Result<Vec<DocResult>> {
270        let json_docs: Vec<serde_json::Value> = docs.iter().map(|d| d.to_json()).collect();
271
272        let request = CouchDbBulkDocsRequest {
273            docs: json_docs,
274            new_edits: if opts.new_edits { None } else { Some(false) },
275        };
276
277        let resp = self.client
278            .post(&self.url("_bulk_docs"))
279            .json(&request)
280            .send()
281            .await
282            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
283        let resp = self.check_error(resp).await?;
284
285        let results: Vec<CouchDbBulkDocsResult> = resp.json().await
286            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
287
288        Ok(results
289            .into_iter()
290            .map(|r| DocResult {
291                ok: r.ok.unwrap_or(r.error.is_none()),
292                id: r.id.unwrap_or_default(),
293                rev: r.rev,
294                error: r.error,
295                reason: r.reason,
296            })
297            .collect())
298    }
299
300    async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
301        let mut params = Vec::new();
302        if opts.include_docs {
303            params.push("include_docs=true".into());
304        }
305        if opts.descending {
306            params.push("descending=true".into());
307        }
308        if let Some(ref start) = opts.start_key {
309            params.push(format!("startkey=\"{}\"", start));
310        }
311        if let Some(ref end) = opts.end_key {
312            params.push(format!("endkey=\"{}\"", end));
313        }
314        if let Some(limit) = opts.limit {
315            params.push(format!("limit={}", limit));
316        }
317        if opts.skip > 0 {
318            params.push(format!("skip={}", opts.skip));
319        }
320
321        let mut url = self.url("_all_docs");
322        if !params.is_empty() {
323            url = format!("{}?{}", url, params.join("&"));
324        }
325
326        let resp = self.client.get(&url).send().await
327            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
328        let resp = self.check_error(resp).await?;
329        let result: CouchDbAllDocsResponse = resp.json().await
330            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
331
332        Ok(AllDocsResponse {
333            total_rows: result.total_rows,
334            offset: result.offset,
335            rows: result
336                .rows
337                .into_iter()
338                .map(|r| AllDocsRow {
339                    id: r.id,
340                    key: r.key,
341                    value: AllDocsRowValue {
342                        rev: r.value.rev,
343                        deleted: r.value.deleted,
344                    },
345                    doc: r.doc,
346                })
347                .collect(),
348        })
349    }
350
351    async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
352        let mut params = vec![format!("since={}", opts.since.to_query_string())];
353        if opts.include_docs {
354            params.push("include_docs=true".into());
355        }
356        if opts.descending {
357            params.push("descending=true".into());
358        }
359        if let Some(limit) = opts.limit {
360            params.push(format!("limit={}", limit));
361        }
362        if let Some(ref _doc_ids) = opts.doc_ids {
363            params.push("filter=_doc_ids".into());
364            // doc_ids need to be sent as POST body for _changes
365        }
366
367        let url = format!("{}?{}", self.url("_changes"), params.join("&"));
368
369        let resp = if opts.doc_ids.is_some() {
370            let body = serde_json::json!({
371                "doc_ids": opts.doc_ids.unwrap()
372            });
373            self.client.post(&url).json(&body).send().await
374                .map_err(|e| RouchError::DatabaseError(e.to_string()))?
375        } else {
376            self.client.get(&url).send().await
377                .map_err(|e| RouchError::DatabaseError(e.to_string()))?
378        };
379
380        let resp = self.check_error(resp).await?;
381        let result: CouchDbChangesResponse = resp.json().await
382            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
383
384        Ok(ChangesResponse {
385            last_seq: parse_seq(&result.last_seq),
386            results: result
387                .results
388                .into_iter()
389                .map(|r| ChangeEvent {
390                    seq: parse_seq(&r.seq),
391                    id: r.id,
392                    changes: r.changes.into_iter().map(|c| ChangeRev { rev: c.rev }).collect(),
393                    deleted: r.deleted,
394                    doc: r.doc,
395                })
396                .collect(),
397        })
398    }
399
400    async fn revs_diff(
401        &self,
402        revs: HashMap<String, Vec<String>>,
403    ) -> Result<RevsDiffResponse> {
404        let resp = self.client
405            .post(&self.url("_revs_diff"))
406            .json(&revs)
407            .send()
408            .await
409            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
410        let resp = self.check_error(resp).await?;
411
412        let results: HashMap<String, RevsDiffResult> = resp.json().await
413            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
414
415        Ok(RevsDiffResponse { results })
416    }
417
418    async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
419        let request = CouchDbBulkGetRequest {
420            docs: docs
421                .into_iter()
422                .map(|d| CouchDbBulkGetDoc {
423                    id: d.id,
424                    rev: d.rev,
425                })
426                .collect(),
427        };
428
429        let resp = self.client
430            .post(&self.url("_bulk_get?revs=true"))
431            .json(&request)
432            .send()
433            .await
434            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
435        let resp = self.check_error(resp).await?;
436
437        let result: CouchDbBulkGetResponse = resp.json().await
438            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
439
440        Ok(BulkGetResponse {
441            results: result
442                .results
443                .into_iter()
444                .map(|r| BulkGetResult {
445                    id: r.id,
446                    docs: r.docs
447                        .into_iter()
448                        .map(|d| BulkGetDoc {
449                            ok: d.ok,
450                            error: d.error.map(|e| BulkGetError {
451                                id: e.id,
452                                rev: e.rev,
453                                error: e.error,
454                                reason: e.reason,
455                            }),
456                        })
457                        .collect(),
458                })
459                .collect(),
460        })
461    }
462
463    async fn put_attachment(
464        &self,
465        doc_id: &str,
466        att_id: &str,
467        rev: &str,
468        data: Vec<u8>,
469        content_type: &str,
470    ) -> Result<DocResult> {
471        let url = format!("{}/{}?rev={}", self.url(&urlencoded(doc_id)), urlencoded(att_id), rev);
472
473        let resp = self.client
474            .put(&url)
475            .header("Content-Type", content_type)
476            .body(data)
477            .send()
478            .await
479            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
480        let resp = self.check_error(resp).await?;
481        let result: CouchDbPutResponse = resp.json().await
482            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
483
484        Ok(DocResult {
485            ok: result.ok.unwrap_or(true),
486            id: result.id,
487            rev: Some(result.rev),
488            error: None,
489            reason: None,
490        })
491    }
492
493    async fn get_attachment(
494        &self,
495        doc_id: &str,
496        att_id: &str,
497        opts: GetAttachmentOptions,
498    ) -> Result<Vec<u8>> {
499        let mut url = format!("{}/{}", self.url(&urlencoded(doc_id)), urlencoded(att_id));
500        if let Some(ref rev) = opts.rev {
501            url = format!("{}?rev={}", url, rev);
502        }
503
504        let resp = self.client.get(&url).send().await
505            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
506        let resp = self.check_error(resp).await?;
507        let bytes = resp.bytes().await
508            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
509
510        Ok(bytes.to_vec())
511    }
512
513    async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
514        let url = self.url(&format!("_local/{}", urlencoded(id)));
515        let resp = self.client.get(&url).send().await
516            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
517        let resp = self.check_error(resp).await?;
518        let json: serde_json::Value = resp.json().await
519            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
520        Ok(json)
521    }
522
523    async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
524        let url = self.url(&format!("_local/{}", urlencoded(id)));
525        let resp = self.client.put(&url).json(&doc).send().await
526            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
527        self.check_error(resp).await?;
528        Ok(())
529    }
530
531    async fn remove_local(&self, id: &str) -> Result<()> {
532        // Need to get the current rev first
533        let doc = self.get_local(id).await?;
534        let rev = doc["_rev"].as_str().unwrap_or("");
535        let url = format!("{}?rev={}", self.url(&format!("_local/{}", urlencoded(id))), rev);
536        let resp = self.client.delete(&url).send().await
537            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
538        self.check_error(resp).await?;
539        Ok(())
540    }
541
542    async fn compact(&self) -> Result<()> {
543        let resp = self.client
544            .post(&self.url("_compact"))
545            .header("Content-Type", "application/json")
546            .send()
547            .await
548            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
549        self.check_error(resp).await?;
550        Ok(())
551    }
552
553    async fn destroy(&self) -> Result<()> {
554        let resp = self.client.delete(&self.base_url).send().await
555            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
556        self.check_error(resp).await?;
557        Ok(())
558    }
559}
560
561/// Simple percent-encoding for document IDs (handles special chars).
562fn urlencoded(s: &str) -> String {
563    // Encode characters that are special in URLs
564    s.replace('%', "%25")
565        .replace(' ', "%20")
566        .replace('/', "%2F")
567        .replace('?', "%3F")
568        .replace('#', "%23")
569        .replace('+', "%2B")
570}