1pub mod auth;
7
8use std::collections::HashMap;
9
10use async_trait::async_trait;
11use reqwest::Client;
12use serde::{Deserialize, Serialize};
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::*;
16use rouchdb_core::error::{Result, RouchError};
17
18#[derive(Debug, Deserialize)]
23struct CouchDbInfo {
24 db_name: String,
25 doc_count: u64,
26 update_seq: serde_json::Value, }
28
29#[derive(Debug, Deserialize)]
30struct CouchDbPutResponse {
31 ok: Option<bool>,
32 id: String,
33 rev: String,
34}
35
36#[derive(Debug, Deserialize)]
37struct CouchDbError {
38 #[allow(dead_code)]
39 error: String,
40 reason: String,
41}
42
43#[derive(Debug, Serialize)]
44struct CouchDbBulkDocsRequest {
45 docs: Vec<serde_json::Value>,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 new_edits: Option<bool>,
48}
49
50#[derive(Debug, Deserialize)]
51struct CouchDbBulkDocsResult {
52 ok: Option<bool>,
53 id: Option<String>,
54 rev: Option<String>,
55 error: Option<String>,
56 reason: Option<String>,
57}
58
59#[derive(Debug, Serialize)]
60struct CouchDbBulkGetRequest {
61 docs: Vec<CouchDbBulkGetDoc>,
62}
63
64#[derive(Debug, Serialize)]
65struct CouchDbBulkGetDoc {
66 id: String,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 rev: Option<String>,
69}
70
71#[derive(Debug, Deserialize)]
72struct CouchDbBulkGetResponse {
73 results: Vec<CouchDbBulkGetResult>,
74}
75
76#[derive(Debug, Deserialize)]
77struct CouchDbBulkGetResult {
78 id: String,
79 docs: Vec<CouchDbBulkGetDocResult>,
80}
81
82#[derive(Debug, Deserialize)]
83struct CouchDbBulkGetDocResult {
84 ok: Option<serde_json::Value>,
85 error: Option<CouchDbBulkGetErrorResult>,
86}
87
88#[derive(Debug, Deserialize)]
89struct CouchDbBulkGetErrorResult {
90 id: String,
91 rev: String,
92 error: String,
93 reason: String,
94}
95
96#[derive(Debug, Deserialize)]
97struct CouchDbChangesResponse {
98 results: Vec<CouchDbChangeResult>,
99 last_seq: serde_json::Value,
100}
101
102#[derive(Debug, Deserialize)]
103struct CouchDbChangeResult {
104 seq: serde_json::Value,
105 id: String,
106 changes: Vec<CouchDbChangeRev>,
107 #[serde(default)]
108 deleted: bool,
109 doc: Option<serde_json::Value>,
110}
111
112#[derive(Debug, Deserialize)]
113struct CouchDbChangeRev {
114 rev: String,
115}
116
117#[derive(Debug, Deserialize)]
118struct CouchDbAllDocsResponse {
119 total_rows: u64,
120 offset: u64,
121 rows: Vec<CouchDbAllDocsRow>,
122}
123
124#[derive(Debug, Deserialize)]
125struct CouchDbAllDocsRow {
126 id: String,
127 key: String,
128 value: CouchDbAllDocsRowValue,
129 doc: Option<serde_json::Value>,
130}
131
132#[derive(Debug, Deserialize)]
133struct CouchDbAllDocsRowValue {
134 rev: String,
135 #[serde(default)]
136 deleted: Option<bool>,
137}
138
139pub struct HttpAdapter {
145 client: Client,
146 base_url: String,
147}
148
149impl HttpAdapter {
150 pub fn new(url: &str) -> Self {
155 let base_url = url.trim_end_matches('/').to_string();
156 Self {
157 client: Client::new(),
158 base_url,
159 }
160 }
161
162 pub fn with_client(url: &str, client: Client) -> Self {
164 let base_url = url.trim_end_matches('/').to_string();
165 Self { client, base_url }
166 }
167
168 pub fn with_auth_client(url: &str, auth: &auth::AuthClient) -> Self {
173 Self::with_client(url, auth.client().clone())
174 }
175
176 fn url(&self, path: &str) -> String {
177 format!("{}/{}", self.base_url, path.trim_start_matches('/'))
178 }
179
180 async fn check_error(&self, response: reqwest::Response) -> Result<reqwest::Response> {
181 let status = response.status();
182 if status.is_success() {
183 return Ok(response);
184 }
185
186 match status.as_u16() {
187 401 => Err(RouchError::Unauthorized),
188 403 => {
189 let body: CouchDbError = response.json().await.unwrap_or(CouchDbError {
190 error: "forbidden".into(),
191 reason: "access denied".into(),
192 });
193 Err(RouchError::Forbidden(body.reason))
194 }
195 404 => {
196 let body: CouchDbError = response.json().await.unwrap_or(CouchDbError {
197 error: "not_found".into(),
198 reason: "missing".into(),
199 });
200 Err(RouchError::NotFound(body.reason))
201 }
202 409 => Err(RouchError::Conflict),
203 _ => {
204 let body = response.text().await.unwrap_or_default();
205 Err(RouchError::DatabaseError(format!(
206 "HTTP {}: {}",
207 status, body
208 )))
209 }
210 }
211 }
212}
213
214fn parse_seq(value: &serde_json::Value) -> Seq {
216 match value {
217 serde_json::Value::Number(n) => Seq::Num(n.as_u64().unwrap_or(0)),
218 serde_json::Value::String(s) => {
219 if let Ok(n) = s.parse::<u64>() {
220 Seq::Num(n)
221 } else {
222 Seq::Str(s.clone())
223 }
224 }
225 _ => Seq::Num(0),
226 }
227}
228
229#[async_trait]
230impl Adapter for HttpAdapter {
231 async fn info(&self) -> Result<DbInfo> {
232 let resp = self
233 .client
234 .get(&self.base_url)
235 .send()
236 .await
237 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
238 let resp = self.check_error(resp).await?;
239 let info: CouchDbInfo = resp
240 .json()
241 .await
242 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
243
244 Ok(DbInfo {
245 db_name: info.db_name,
246 doc_count: info.doc_count,
247 update_seq: parse_seq(&info.update_seq),
248 })
249 }
250
251 async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
252 let mut url = self.url(&urlencoded(id));
253 let mut params = Vec::new();
254
255 if let Some(ref rev) = opts.rev {
256 params.push(format!("rev={}", rev));
257 }
258 if opts.conflicts {
259 params.push("conflicts=true".into());
260 }
261 if opts.revs {
262 params.push("revs=true".into());
263 }
264 if opts.revs_info {
265 params.push("revs_info=true".into());
266 }
267 if opts.latest {
268 params.push("latest=true".into());
269 }
270 if opts.attachments {
271 params.push("attachments=true".into());
272 }
273 if let Some(ref open_revs) = opts.open_revs {
274 match open_revs {
275 OpenRevs::All => params.push("open_revs=all".into()),
276 OpenRevs::Specific(revs) => {
277 let json = serde_json::to_string(revs).unwrap_or_default();
278 params.push(format!("open_revs={}", json));
279 }
280 }
281 }
282
283 if !params.is_empty() {
284 url = format!("{}?{}", url, params.join("&"));
285 }
286
287 let resp = self
288 .client
289 .get(&url)
290 .send()
291 .await
292 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
293 let resp = self.check_error(resp).await?;
294 let json: serde_json::Value = resp
295 .json()
296 .await
297 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
298
299 Document::from_json(json)
300 }
301
302 async fn bulk_docs(
303 &self,
304 docs: Vec<Document>,
305 opts: BulkDocsOptions,
306 ) -> Result<Vec<DocResult>> {
307 let json_docs: Vec<serde_json::Value> = docs.iter().map(|d| d.to_json()).collect();
308
309 let request = CouchDbBulkDocsRequest {
310 docs: json_docs,
311 new_edits: if opts.new_edits { None } else { Some(false) },
312 };
313
314 let resp = self
315 .client
316 .post(self.url("_bulk_docs"))
317 .json(&request)
318 .send()
319 .await
320 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
321 let resp = self.check_error(resp).await?;
322
323 let results: Vec<CouchDbBulkDocsResult> = resp
324 .json()
325 .await
326 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
327
328 Ok(results
329 .into_iter()
330 .map(|r| DocResult {
331 ok: r.ok.unwrap_or(r.error.is_none()),
332 id: r.id.unwrap_or_default(),
333 rev: r.rev,
334 error: r.error,
335 reason: r.reason,
336 })
337 .collect())
338 }
339
340 async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
341 let mut params = Vec::new();
342 if opts.include_docs {
343 params.push("include_docs=true".into());
344 }
345 if opts.descending {
346 params.push("descending=true".into());
347 }
348 if let Some(ref start) = opts.start_key {
349 params.push(format!("startkey=\"{}\"", start));
350 }
351 if let Some(ref end) = opts.end_key {
352 params.push(format!("endkey=\"{}\"", end));
353 }
354 if let Some(limit) = opts.limit {
355 params.push(format!("limit={}", limit));
356 }
357 if opts.skip > 0 {
358 params.push(format!("skip={}", opts.skip));
359 }
360 if opts.conflicts {
361 params.push("conflicts=true".into());
362 }
363 if opts.update_seq {
364 params.push("update_seq=true".into());
365 }
366
367 let mut url = self.url("_all_docs");
368 if !params.is_empty() {
369 url = format!("{}?{}", url, params.join("&"));
370 }
371
372 let resp = self
373 .client
374 .get(&url)
375 .send()
376 .await
377 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
378 let resp = self.check_error(resp).await?;
379 let result: CouchDbAllDocsResponse = resp
380 .json()
381 .await
382 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
383
384 Ok(AllDocsResponse {
385 total_rows: result.total_rows,
386 offset: result.offset,
387 rows: result
388 .rows
389 .into_iter()
390 .map(|r| AllDocsRow {
391 id: r.id,
392 key: r.key,
393 value: AllDocsRowValue {
394 rev: r.value.rev,
395 deleted: r.value.deleted,
396 },
397 doc: r.doc,
398 })
399 .collect(),
400 update_seq: None, })
402 }
403
404 async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
405 let mut params = vec![format!("since={}", opts.since.to_query_string())];
406 if opts.include_docs {
407 params.push("include_docs=true".into());
408 }
409 if opts.descending {
410 params.push("descending=true".into());
411 }
412 if let Some(limit) = opts.limit {
413 params.push(format!("limit={}", limit));
414 }
415
416 if opts.conflicts {
417 params.push("conflicts=true".into());
418 }
419 if opts.style == ChangesStyle::AllDocs {
420 params.push("style=all_docs".into());
421 }
422
423 let use_post = opts.doc_ids.is_some() || opts.selector.is_some();
425 if opts.doc_ids.is_some() {
426 params.push("filter=_doc_ids".into());
427 } else if opts.selector.is_some() {
428 params.push("filter=_selector".into());
429 }
430
431 let url = format!("{}?{}", self.url("_changes"), params.join("&"));
432
433 let resp = if use_post {
434 let body = if let Some(doc_ids) = opts.doc_ids {
435 serde_json::json!({ "doc_ids": doc_ids })
436 } else if let Some(selector) = opts.selector {
437 serde_json::json!({ "selector": selector })
438 } else {
439 serde_json::json!({})
440 };
441 self.client
442 .post(&url)
443 .json(&body)
444 .send()
445 .await
446 .map_err(|e| RouchError::DatabaseError(e.to_string()))?
447 } else {
448 self.client
449 .get(&url)
450 .send()
451 .await
452 .map_err(|e| RouchError::DatabaseError(e.to_string()))?
453 };
454
455 let resp = self.check_error(resp).await?;
456 let result: CouchDbChangesResponse = resp
457 .json()
458 .await
459 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
460
461 Ok(ChangesResponse {
462 last_seq: parse_seq(&result.last_seq),
463 results: result
464 .results
465 .into_iter()
466 .map(|r| ChangeEvent {
467 seq: parse_seq(&r.seq),
468 id: r.id,
469 changes: r
470 .changes
471 .into_iter()
472 .map(|c| ChangeRev { rev: c.rev })
473 .collect(),
474 deleted: r.deleted,
475 doc: r.doc,
476 conflicts: None, })
478 .collect(),
479 })
480 }
481
482 async fn revs_diff(&self, revs: HashMap<String, Vec<String>>) -> Result<RevsDiffResponse> {
483 let resp = self
484 .client
485 .post(self.url("_revs_diff"))
486 .json(&revs)
487 .send()
488 .await
489 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
490 let resp = self.check_error(resp).await?;
491
492 let results: HashMap<String, RevsDiffResult> = resp
493 .json()
494 .await
495 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
496
497 Ok(RevsDiffResponse { results })
498 }
499
500 async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
501 let request = CouchDbBulkGetRequest {
502 docs: docs
503 .into_iter()
504 .map(|d| CouchDbBulkGetDoc {
505 id: d.id,
506 rev: d.rev,
507 })
508 .collect(),
509 };
510
511 let resp = self
512 .client
513 .post(self.url("_bulk_get?revs=true"))
514 .json(&request)
515 .send()
516 .await
517 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
518 let resp = self.check_error(resp).await?;
519
520 let result: CouchDbBulkGetResponse = resp
521 .json()
522 .await
523 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
524
525 Ok(BulkGetResponse {
526 results: result
527 .results
528 .into_iter()
529 .map(|r| BulkGetResult {
530 id: r.id,
531 docs: r
532 .docs
533 .into_iter()
534 .map(|d| BulkGetDoc {
535 ok: d.ok,
536 error: d.error.map(|e| BulkGetError {
537 id: e.id,
538 rev: e.rev,
539 error: e.error,
540 reason: e.reason,
541 }),
542 })
543 .collect(),
544 })
545 .collect(),
546 })
547 }
548
549 async fn put_attachment(
550 &self,
551 doc_id: &str,
552 att_id: &str,
553 rev: &str,
554 data: Vec<u8>,
555 content_type: &str,
556 ) -> Result<DocResult> {
557 let url = format!(
558 "{}/{}?rev={}",
559 self.url(&urlencoded(doc_id)),
560 urlencoded(att_id),
561 rev
562 );
563
564 let resp = self
565 .client
566 .put(&url)
567 .header("Content-Type", content_type)
568 .body(data)
569 .send()
570 .await
571 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
572 let resp = self.check_error(resp).await?;
573 let result: CouchDbPutResponse = resp
574 .json()
575 .await
576 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
577
578 Ok(DocResult {
579 ok: result.ok.unwrap_or(true),
580 id: result.id,
581 rev: Some(result.rev),
582 error: None,
583 reason: None,
584 })
585 }
586
587 async fn get_attachment(
588 &self,
589 doc_id: &str,
590 att_id: &str,
591 opts: GetAttachmentOptions,
592 ) -> Result<Vec<u8>> {
593 let mut url = format!("{}/{}", self.url(&urlencoded(doc_id)), urlencoded(att_id));
594 if let Some(ref rev) = opts.rev {
595 url = format!("{}?rev={}", url, rev);
596 }
597
598 let resp = self
599 .client
600 .get(&url)
601 .send()
602 .await
603 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
604 let resp = self.check_error(resp).await?;
605 let bytes = resp
606 .bytes()
607 .await
608 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
609
610 Ok(bytes.to_vec())
611 }
612
613 async fn remove_attachment(&self, doc_id: &str, att_id: &str, rev: &str) -> Result<DocResult> {
614 let url = format!(
615 "{}/{}?rev={}",
616 self.url(&urlencoded(doc_id)),
617 urlencoded(att_id),
618 rev
619 );
620
621 let resp = self
622 .client
623 .delete(&url)
624 .send()
625 .await
626 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
627 let resp = self.check_error(resp).await?;
628 let result: CouchDbPutResponse = resp
629 .json()
630 .await
631 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
632
633 Ok(DocResult {
634 ok: result.ok.unwrap_or(true),
635 id: result.id,
636 rev: Some(result.rev),
637 error: None,
638 reason: None,
639 })
640 }
641
642 async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
643 let url = self.url(&format!("_local/{}", urlencoded(id)));
644 let resp = self
645 .client
646 .get(&url)
647 .send()
648 .await
649 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
650 let resp = self.check_error(resp).await?;
651 let json: serde_json::Value = resp
652 .json()
653 .await
654 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
655 Ok(json)
656 }
657
658 async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
659 let url = self.url(&format!("_local/{}", urlencoded(id)));
660 let resp = self
661 .client
662 .put(&url)
663 .json(&doc)
664 .send()
665 .await
666 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
667 self.check_error(resp).await?;
668 Ok(())
669 }
670
671 async fn remove_local(&self, id: &str) -> Result<()> {
672 let doc = self.get_local(id).await?;
674 let rev = doc["_rev"].as_str().unwrap_or("");
675 let url = format!(
676 "{}?rev={}",
677 self.url(&format!("_local/{}", urlencoded(id))),
678 rev
679 );
680 let resp = self
681 .client
682 .delete(&url)
683 .send()
684 .await
685 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
686 self.check_error(resp).await?;
687 Ok(())
688 }
689
690 async fn compact(&self) -> Result<()> {
691 let resp = self
692 .client
693 .post(self.url("_compact"))
694 .header("Content-Type", "application/json")
695 .send()
696 .await
697 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
698 self.check_error(resp).await?;
699 Ok(())
700 }
701
702 async fn destroy(&self) -> Result<()> {
703 let resp = self
704 .client
705 .delete(&self.base_url)
706 .send()
707 .await
708 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
709 self.check_error(resp).await?;
710 Ok(())
711 }
712
713 async fn purge(&self, req: HashMap<String, Vec<String>>) -> Result<PurgeResponse> {
714 let resp = self
715 .client
716 .post(self.url("_purge"))
717 .json(&req)
718 .send()
719 .await
720 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
721 let resp = self.check_error(resp).await?;
722 let result: PurgeResponse = resp
723 .json()
724 .await
725 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
726 Ok(result)
727 }
728
729 async fn get_security(&self) -> Result<SecurityDocument> {
730 let resp = self
731 .client
732 .get(self.url("_security"))
733 .send()
734 .await
735 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
736 let resp = self.check_error(resp).await?;
737 let doc: SecurityDocument = resp
738 .json()
739 .await
740 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
741 Ok(doc)
742 }
743
744 async fn put_security(&self, doc: SecurityDocument) -> Result<()> {
745 let resp = self
746 .client
747 .put(self.url("_security"))
748 .json(&doc)
749 .send()
750 .await
751 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
752 self.check_error(resp).await?;
753 Ok(())
754 }
755}
756
757fn urlencoded(s: &str) -> String {
762 const UNRESERVED: &percent_encoding::AsciiSet = &percent_encoding::NON_ALPHANUMERIC
765 .remove(b'-')
766 .remove(b'_')
767 .remove(b'.')
768 .remove(b'~');
769 percent_encoding::percent_encode(s.as_bytes(), UNRESERVED).to_string()
770}