Skip to main content

rouchdb_adapter_http/
lib.rs

1/// HTTP adapter for RouchDB.
2///
3/// Communicates with a remote CouchDB-compatible server via HTTP,
4/// implementing the Adapter trait by mapping each method to the
5/// corresponding CouchDB REST API endpoint.
6pub mod auth;
7
8use std::collections::HashMap;
9
10use async_trait::async_trait;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::*;
16use rouchdb_core::error::{Result, RouchError};
17
18// ---------------------------------------------------------------------------
19// CouchDB JSON response shapes
20// ---------------------------------------------------------------------------
21
22#[derive(Debug, Deserialize)]
23struct CouchDbInfo {
24    db_name: String,
25    doc_count: u64,
26    update_seq: serde_json::Value, // Can be integer or string depending on CouchDB version
27}
28
29#[derive(Debug, Deserialize)]
30struct CouchDbPutResponse {
31    ok: Option<bool>,
32    id: String,
33    rev: String,
34}
35
36#[derive(Debug, Deserialize)]
37struct CouchDbError {
38    #[allow(dead_code)]
39    error: String,
40    reason: String,
41}
42
43#[derive(Debug, Serialize)]
44struct CouchDbBulkDocsRequest {
45    docs: Vec<serde_json::Value>,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    new_edits: Option<bool>,
48}
49
50#[derive(Debug, Deserialize)]
51struct CouchDbBulkDocsResult {
52    ok: Option<bool>,
53    id: Option<String>,
54    rev: Option<String>,
55    error: Option<String>,
56    reason: Option<String>,
57}
58
59#[derive(Debug, Serialize)]
60struct CouchDbBulkGetRequest {
61    docs: Vec<CouchDbBulkGetDoc>,
62}
63
64#[derive(Debug, Serialize)]
65struct CouchDbBulkGetDoc {
66    id: String,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    rev: Option<String>,
69}
70
71#[derive(Debug, Deserialize)]
72struct CouchDbBulkGetResponse {
73    results: Vec<CouchDbBulkGetResult>,
74}
75
76#[derive(Debug, Deserialize)]
77struct CouchDbBulkGetResult {
78    id: String,
79    docs: Vec<CouchDbBulkGetDocResult>,
80}
81
82#[derive(Debug, Deserialize)]
83struct CouchDbBulkGetDocResult {
84    ok: Option<serde_json::Value>,
85    error: Option<CouchDbBulkGetErrorResult>,
86}
87
88#[derive(Debug, Deserialize)]
89struct CouchDbBulkGetErrorResult {
90    id: String,
91    rev: String,
92    error: String,
93    reason: String,
94}
95
96#[derive(Debug, Deserialize)]
97struct CouchDbChangesResponse {
98    results: Vec<CouchDbChangeResult>,
99    last_seq: serde_json::Value,
100}
101
102#[derive(Debug, Deserialize)]
103struct CouchDbChangeResult {
104    seq: serde_json::Value,
105    id: String,
106    changes: Vec<CouchDbChangeRev>,
107    #[serde(default)]
108    deleted: bool,
109    doc: Option<serde_json::Value>,
110}
111
112#[derive(Debug, Deserialize)]
113struct CouchDbChangeRev {
114    rev: String,
115}
116
117#[derive(Debug, Deserialize)]
118struct CouchDbAllDocsResponse {
119    total_rows: u64,
120    offset: u64,
121    rows: Vec<CouchDbAllDocsRow>,
122}
123
124#[derive(Debug, Deserialize)]
125struct CouchDbAllDocsRow {
126    id: String,
127    key: String,
128    value: CouchDbAllDocsRowValue,
129    doc: Option<serde_json::Value>,
130}
131
132#[derive(Debug, Deserialize)]
133struct CouchDbAllDocsRowValue {
134    rev: String,
135    #[serde(default)]
136    deleted: Option<bool>,
137}
138
139// ---------------------------------------------------------------------------
140// HttpAdapter
141// ---------------------------------------------------------------------------
142
143/// HTTP adapter that talks to a remote CouchDB instance.
144pub struct HttpAdapter {
145    client: Client,
146    base_url: String,
147}
148
149impl HttpAdapter {
150    /// Create a new HTTP adapter pointing at a CouchDB database URL.
151    ///
152    /// The URL should include the database name, e.g.
153    /// `http://localhost:5984/mydb` or `http://admin:password@localhost:5984/mydb`
154    pub fn new(url: &str) -> Self {
155        let base_url = url.trim_end_matches('/').to_string();
156        Self {
157            client: Client::new(),
158            base_url,
159        }
160    }
161
162    /// Create a new HTTP adapter with a custom reqwest client.
163    pub fn with_client(url: &str, client: Client) -> Self {
164        let base_url = url.trim_end_matches('/').to_string();
165        Self { client, base_url }
166    }
167
168    /// Create a new HTTP adapter using an authenticated client.
169    ///
170    /// The `AuthClient` must have been logged in already; its internal
171    /// reqwest client (with cookie store) will be shared with this adapter.
172    pub fn with_auth_client(url: &str, auth: &auth::AuthClient) -> Self {
173        Self::with_client(url, auth.client().clone())
174    }
175
176    fn url(&self, path: &str) -> String {
177        format!("{}/{}", self.base_url, path.trim_start_matches('/'))
178    }
179
180    async fn check_error(&self, response: reqwest::Response) -> Result<reqwest::Response> {
181        let status = response.status();
182        if status.is_success() {
183            return Ok(response);
184        }
185
186        match status.as_u16() {
187            401 => Err(RouchError::Unauthorized),
188            403 => {
189                let body: CouchDbError = response.json().await.unwrap_or(CouchDbError {
190                    error: "forbidden".into(),
191                    reason: "access denied".into(),
192                });
193                Err(RouchError::Forbidden(body.reason))
194            }
195            404 => {
196                let body: CouchDbError = response.json().await.unwrap_or(CouchDbError {
197                    error: "not_found".into(),
198                    reason: "missing".into(),
199                });
200                Err(RouchError::NotFound(body.reason))
201            }
202            409 => Err(RouchError::Conflict),
203            _ => {
204                let body = response.text().await.unwrap_or_default();
205                Err(RouchError::DatabaseError(format!(
206                    "HTTP {}: {}",
207                    status, body
208                )))
209            }
210        }
211    }
212}
213
214/// Parse a CouchDB sequence value (can be integer or string).
215fn parse_seq(value: &serde_json::Value) -> Seq {
216    match value {
217        serde_json::Value::Number(n) => Seq::Num(n.as_u64().unwrap_or(0)),
218        serde_json::Value::String(s) => {
219            if let Ok(n) = s.parse::<u64>() {
220                Seq::Num(n)
221            } else {
222                Seq::Str(s.clone())
223            }
224        }
225        _ => Seq::Num(0),
226    }
227}
228
229#[async_trait]
230impl Adapter for HttpAdapter {
231    async fn info(&self) -> Result<DbInfo> {
232        let resp = self
233            .client
234            .get(&self.base_url)
235            .send()
236            .await
237            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
238        let resp = self.check_error(resp).await?;
239        let info: CouchDbInfo = resp
240            .json()
241            .await
242            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
243
244        Ok(DbInfo {
245            db_name: info.db_name,
246            doc_count: info.doc_count,
247            update_seq: parse_seq(&info.update_seq),
248        })
249    }
250
251    async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
252        let mut url = self.url(&urlencoded(id));
253        let mut params = Vec::new();
254
255        if let Some(ref rev) = opts.rev {
256            params.push(format!("rev={}", rev));
257        }
258        if opts.conflicts {
259            params.push("conflicts=true".into());
260        }
261        if opts.revs {
262            params.push("revs=true".into());
263        }
264        if opts.revs_info {
265            params.push("revs_info=true".into());
266        }
267        if opts.latest {
268            params.push("latest=true".into());
269        }
270        if opts.attachments {
271            params.push("attachments=true".into());
272        }
273        if let Some(ref open_revs) = opts.open_revs {
274            match open_revs {
275                OpenRevs::All => params.push("open_revs=all".into()),
276                OpenRevs::Specific(revs) => {
277                    let json = serde_json::to_string(revs).unwrap_or_default();
278                    params.push(format!("open_revs={}", json));
279                }
280            }
281        }
282
283        if !params.is_empty() {
284            url = format!("{}?{}", url, params.join("&"));
285        }
286
287        let resp = self
288            .client
289            .get(&url)
290            .send()
291            .await
292            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
293        let resp = self.check_error(resp).await?;
294        let json: serde_json::Value = resp
295            .json()
296            .await
297            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
298
299        Document::from_json(json)
300    }
301
302    async fn bulk_docs(
303        &self,
304        docs: Vec<Document>,
305        opts: BulkDocsOptions,
306    ) -> Result<Vec<DocResult>> {
307        let json_docs: Vec<serde_json::Value> = docs.iter().map(|d| d.to_json()).collect();
308
309        let request = CouchDbBulkDocsRequest {
310            docs: json_docs,
311            new_edits: if opts.new_edits { None } else { Some(false) },
312        };
313
314        let resp = self
315            .client
316            .post(self.url("_bulk_docs"))
317            .json(&request)
318            .send()
319            .await
320            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
321        let resp = self.check_error(resp).await?;
322
323        let results: Vec<CouchDbBulkDocsResult> = resp
324            .json()
325            .await
326            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
327
328        Ok(results
329            .into_iter()
330            .map(|r| DocResult {
331                ok: r.ok.unwrap_or(r.error.is_none()),
332                id: r.id.unwrap_or_default(),
333                rev: r.rev,
334                error: r.error,
335                reason: r.reason,
336            })
337            .collect())
338    }
339
340    async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
341        let mut params = Vec::new();
342        if opts.include_docs {
343            params.push("include_docs=true".into());
344        }
345        if opts.descending {
346            params.push("descending=true".into());
347        }
348        if let Some(ref start) = opts.start_key {
349            params.push(format!("startkey=\"{}\"", start));
350        }
351        if let Some(ref end) = opts.end_key {
352            params.push(format!("endkey=\"{}\"", end));
353        }
354        if let Some(limit) = opts.limit {
355            params.push(format!("limit={}", limit));
356        }
357        if opts.skip > 0 {
358            params.push(format!("skip={}", opts.skip));
359        }
360        if opts.conflicts {
361            params.push("conflicts=true".into());
362        }
363        if opts.update_seq {
364            params.push("update_seq=true".into());
365        }
366
367        let mut url = self.url("_all_docs");
368        if !params.is_empty() {
369            url = format!("{}?{}", url, params.join("&"));
370        }
371
372        let resp = self
373            .client
374            .get(&url)
375            .send()
376            .await
377            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
378        let resp = self.check_error(resp).await?;
379        let result: CouchDbAllDocsResponse = resp
380            .json()
381            .await
382            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
383
384        Ok(AllDocsResponse {
385            total_rows: result.total_rows,
386            offset: result.offset,
387            rows: result
388                .rows
389                .into_iter()
390                .map(|r| AllDocsRow {
391                    id: r.id,
392                    key: r.key,
393                    value: AllDocsRowValue {
394                        rev: r.value.rev,
395                        deleted: r.value.deleted,
396                    },
397                    doc: r.doc,
398                })
399                .collect(),
400            update_seq: None, // TODO: parse from CouchDB response when update_seq=true
401        })
402    }
403
404    async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
405        let mut params = vec![format!("since={}", opts.since.to_query_string())];
406        if opts.include_docs {
407            params.push("include_docs=true".into());
408        }
409        if opts.descending {
410            params.push("descending=true".into());
411        }
412        if let Some(limit) = opts.limit {
413            params.push(format!("limit={}", limit));
414        }
415
416        if opts.conflicts {
417            params.push("conflicts=true".into());
418        }
419        if opts.style == ChangesStyle::AllDocs {
420            params.push("style=all_docs".into());
421        }
422
423        // Determine which filter to use — doc_ids and selector are mutually exclusive
424        let use_post = opts.doc_ids.is_some() || opts.selector.is_some();
425        if opts.doc_ids.is_some() {
426            params.push("filter=_doc_ids".into());
427        } else if opts.selector.is_some() {
428            params.push("filter=_selector".into());
429        }
430
431        let url = format!("{}?{}", self.url("_changes"), params.join("&"));
432
433        let resp = if use_post {
434            let body = if let Some(doc_ids) = opts.doc_ids {
435                serde_json::json!({ "doc_ids": doc_ids })
436            } else if let Some(selector) = opts.selector {
437                serde_json::json!({ "selector": selector })
438            } else {
439                serde_json::json!({})
440            };
441            self.client
442                .post(&url)
443                .json(&body)
444                .send()
445                .await
446                .map_err(|e| RouchError::DatabaseError(e.to_string()))?
447        } else {
448            self.client
449                .get(&url)
450                .send()
451                .await
452                .map_err(|e| RouchError::DatabaseError(e.to_string()))?
453        };
454
455        let resp = self.check_error(resp).await?;
456        let result: CouchDbChangesResponse = resp
457            .json()
458            .await
459            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
460
461        Ok(ChangesResponse {
462            last_seq: parse_seq(&result.last_seq),
463            results: result
464                .results
465                .into_iter()
466                .map(|r| ChangeEvent {
467                    seq: parse_seq(&r.seq),
468                    id: r.id,
469                    changes: r
470                        .changes
471                        .into_iter()
472                        .map(|c| ChangeRev { rev: c.rev })
473                        .collect(),
474                    deleted: r.deleted,
475                    doc: r.doc,
476                    conflicts: None, // CouchDB includes these inline in the doc
477                })
478                .collect(),
479        })
480    }
481
482    async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
483        let resp = self
484            .client
485            .post(self.url("_revs_diff"))
486            .json(&revs)
487            .send()
488            .await
489            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
490        let resp = self.check_error(resp).await?;
491
492        let results: HashMap<String, RevsDiffResult> = resp
493            .json()
494            .await
495            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
496
497        Ok(RevsDiffResponse { results })
498    }
499
500    async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
501        let request = CouchDbBulkGetRequest {
502            docs: docs
503                .into_iter()
504                .map(|d| CouchDbBulkGetDoc {
505                    id: d.id,
506                    rev: d.rev,
507                })
508                .collect(),
509        };
510
511        let resp = self
512            .client
513            .post(self.url("_bulk_get?revs=true"))
514            .json(&request)
515            .send()
516            .await
517            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
518        let resp = self.check_error(resp).await?;
519
520        let result: CouchDbBulkGetResponse = resp
521            .json()
522            .await
523            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
524
525        Ok(BulkGetResponse {
526            results: result
527                .results
528                .into_iter()
529                .map(|r| BulkGetResult {
530                    id: r.id,
531                    docs: r
532                        .docs
533                        .into_iter()
534                        .map(|d| BulkGetDoc {
535                            ok: d.ok,
536                            error: d.error.map(|e| BulkGetError {
537                                id: e.id,
538                                rev: e.rev,
539                                error: e.error,
540                                reason: e.reason,
541                            }),
542                        })
543                        .collect(),
544                })
545                .collect(),
546        })
547    }
548
549    async fn put_attachment(
550        &self,
551        doc_id: &str,
552        att_id: &str,
553        rev: &str,
554        data: Vec<u8>,
555        content_type: &str,
556    ) -> Result<DocResult> {
557        let url = format!(
558            "{}/{}?rev={}",
559            self.url(&urlencoded(doc_id)),
560            urlencoded(att_id),
561            rev
562        );
563
564        let resp = self
565            .client
566            .put(&url)
567            .header("Content-Type", content_type)
568            .body(data)
569            .send()
570            .await
571            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
572        let resp = self.check_error(resp).await?;
573        let result: CouchDbPutResponse = resp
574            .json()
575            .await
576            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
577
578        Ok(DocResult {
579            ok: result.ok.unwrap_or(true),
580            id: result.id,
581            rev: Some(result.rev),
582            error: None,
583            reason: None,
584        })
585    }
586
587    async fn get_attachment(
588        &self,
589        doc_id: &str,
590        att_id: &str,
591        opts: GetAttachmentOptions,
592    ) -> Result<Vec<u8>> {
593        let mut url = format!("{}/{}", self.url(&urlencoded(doc_id)), urlencoded(att_id));
594        if let Some(ref rev) = opts.rev {
595            url = format!("{}?rev={}", url, rev);
596        }
597
598        let resp = self
599            .client
600            .get(&url)
601            .send()
602            .await
603            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
604        let resp = self.check_error(resp).await?;
605        let bytes = resp
606            .bytes()
607            .await
608            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
609
610        Ok(bytes.to_vec())
611    }
612
613    async fn remove_attachment(&self, doc_id: &str, att_id: &str, rev: &str) -> Result<DocResult> {
614        let url = format!(
615            "{}/{}?rev={}",
616            self.url(&urlencoded(doc_id)),
617            urlencoded(att_id),
618            rev
619        );
620
621        let resp = self
622            .client
623            .delete(&url)
624            .send()
625            .await
626            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
627        let resp = self.check_error(resp).await?;
628        let result: CouchDbPutResponse = resp
629            .json()
630            .await
631            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
632
633        Ok(DocResult {
634            ok: result.ok.unwrap_or(true),
635            id: result.id,
636            rev: Some(result.rev),
637            error: None,
638            reason: None,
639        })
640    }
641
642    async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
643        let url = self.url(&format!("_local/{}", urlencoded(id)));
644        let resp = self
645            .client
646            .get(&url)
647            .send()
648            .await
649            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
650        let resp = self.check_error(resp).await?;
651        let json: serde_json::Value = resp
652            .json()
653            .await
654            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
655        Ok(json)
656    }
657
658    async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
659        let url = self.url(&format!("_local/{}", urlencoded(id)));
660        let resp = self
661            .client
662            .put(&url)
663            .json(&doc)
664            .send()
665            .await
666            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
667        self.check_error(resp).await?;
668        Ok(())
669    }
670
671    async fn remove_local(&self, id: &str) -> Result<()> {
672        // Need to get the current rev first
673        let doc = self.get_local(id).await?;
674        let rev = doc["_rev"].as_str().unwrap_or("");
675        let url = format!(
676            "{}?rev={}",
677            self.url(&format!("_local/{}", urlencoded(id))),
678            rev
679        );
680        let resp = self
681            .client
682            .delete(&url)
683            .send()
684            .await
685            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
686        self.check_error(resp).await?;
687        Ok(())
688    }
689
690    async fn compact(&self) -> Result<()> {
691        let resp = self
692            .client
693            .post(self.url("_compact"))
694            .header("Content-Type", "application/json")
695            .send()
696            .await
697            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
698        self.check_error(resp).await?;
699        Ok(())
700    }
701
702    async fn destroy(&self) -> Result<()> {
703        let resp = self
704            .client
705            .delete(&self.base_url)
706            .send()
707            .await
708            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
709        self.check_error(resp).await?;
710        Ok(())
711    }
712
713    async fn purge(&self, req: HashMap<String, Vec<String>>) -> Result<PurgeResponse> {
714        let resp = self
715            .client
716            .post(self.url("_purge"))
717            .json(&req)
718            .send()
719            .await
720            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
721        let resp = self.check_error(resp).await?;
722        let result: PurgeResponse = resp
723            .json()
724            .await
725            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
726        Ok(result)
727    }
728
729    async fn get_security(&self) -> Result<SecurityDocument> {
730        let resp = self
731            .client
732            .get(self.url("_security"))
733            .send()
734            .await
735            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
736        let resp = self.check_error(resp).await?;
737        let doc: SecurityDocument = resp
738            .json()
739            .await
740            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
741        Ok(doc)
742    }
743
744    async fn put_security(&self, doc: SecurityDocument) -> Result<()> {
745        let resp = self
746            .client
747            .put(self.url("_security"))
748            .json(&doc)
749            .send()
750            .await
751            .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
752        self.check_error(resp).await?;
753        Ok(())
754    }
755}
756
757/// Percent-encode a CouchDB document or attachment ID for safe URL use.
758///
759/// Encodes all characters except unreserved ones (alphanumeric, `-`, `_`, `.`, `~`).
760/// This ensures IDs containing `@`, `&`, `=`, `/`, `+`, spaces, etc. are handled correctly.
761fn urlencoded(s: &str) -> String {
762    /// Characters that do NOT need encoding in a path segment.
763    /// RFC 3986 unreserved: ALPHA / DIGIT / "-" / "." / "_" / "~"
764    const UNRESERVED: &percent_encoding::AsciiSet = &percent_encoding::NON_ALPHANUMERIC
765        .remove(b'-')
766        .remove(b'_')
767        .remove(b'.')
768        .remove(b'~');
769    percent_encoding::percent_encode(s.as_bytes(), UNRESERVED).to_string()
770}