1use std::collections::HashMap;
8
9use async_trait::async_trait;
10use reqwest::Client;
11use serde::{Deserialize, Serialize};
12
13use rouchdb_core::adapter::Adapter;
14use rouchdb_core::document::*;
15use rouchdb_core::error::{Result, RouchError};
16
17#[derive(Debug, Deserialize)]
22struct CouchDbInfo {
23 db_name: String,
24 doc_count: u64,
25 update_seq: serde_json::Value, }
27
28#[derive(Debug, Deserialize)]
29struct CouchDbPutResponse {
30 ok: Option<bool>,
31 id: String,
32 rev: String,
33}
34
35#[derive(Debug, Deserialize)]
36struct CouchDbError {
37 #[allow(dead_code)]
38 error: String,
39 reason: String,
40}
41
42#[derive(Debug, Serialize)]
43struct CouchDbBulkDocsRequest {
44 docs: Vec<serde_json::Value>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 new_edits: Option<bool>,
47}
48
49#[derive(Debug, Deserialize)]
50struct CouchDbBulkDocsResult {
51 ok: Option<bool>,
52 id: Option<String>,
53 rev: Option<String>,
54 error: Option<String>,
55 reason: Option<String>,
56}
57
58#[derive(Debug, Serialize)]
59struct CouchDbBulkGetRequest {
60 docs: Vec<CouchDbBulkGetDoc>,
61}
62
63#[derive(Debug, Serialize)]
64struct CouchDbBulkGetDoc {
65 id: String,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 rev: Option<String>,
68}
69
70#[derive(Debug, Deserialize)]
71struct CouchDbBulkGetResponse {
72 results: Vec<CouchDbBulkGetResult>,
73}
74
75#[derive(Debug, Deserialize)]
76struct CouchDbBulkGetResult {
77 id: String,
78 docs: Vec<CouchDbBulkGetDocResult>,
79}
80
81#[derive(Debug, Deserialize)]
82struct CouchDbBulkGetDocResult {
83 ok: Option<serde_json::Value>,
84 error: Option<CouchDbBulkGetErrorResult>,
85}
86
87#[derive(Debug, Deserialize)]
88struct CouchDbBulkGetErrorResult {
89 id: String,
90 rev: String,
91 error: String,
92 reason: String,
93}
94
95#[derive(Debug, Deserialize)]
96struct CouchDbChangesResponse {
97 results: Vec<CouchDbChangeResult>,
98 last_seq: serde_json::Value,
99}
100
101#[derive(Debug, Deserialize)]
102struct CouchDbChangeResult {
103 seq: serde_json::Value,
104 id: String,
105 changes: Vec<CouchDbChangeRev>,
106 #[serde(default)]
107 deleted: bool,
108 doc: Option<serde_json::Value>,
109}
110
111#[derive(Debug, Deserialize)]
112struct CouchDbChangeRev {
113 rev: String,
114}
115
116#[derive(Debug, Deserialize)]
117struct CouchDbAllDocsResponse {
118 total_rows: u64,
119 offset: u64,
120 rows: Vec<CouchDbAllDocsRow>,
121}
122
123#[derive(Debug, Deserialize)]
124struct CouchDbAllDocsRow {
125 id: String,
126 key: String,
127 value: CouchDbAllDocsRowValue,
128 doc: Option<serde_json::Value>,
129}
130
131#[derive(Debug, Deserialize)]
132struct CouchDbAllDocsRowValue {
133 rev: String,
134 #[serde(default)]
135 deleted: Option<bool>,
136}
137
138pub struct HttpAdapter {
144 client: Client,
145 base_url: String,
146}
147
148impl HttpAdapter {
149 pub fn new(url: &str) -> Self {
154 let base_url = url.trim_end_matches('/').to_string();
155 Self {
156 client: Client::new(),
157 base_url,
158 }
159 }
160
161 pub fn with_client(url: &str, client: Client) -> Self {
163 let base_url = url.trim_end_matches('/').to_string();
164 Self { client, base_url }
165 }
166
167 fn url(&self, path: &str) -> String {
168 format!("{}/{}", self.base_url, path.trim_start_matches('/'))
169 }
170
171 async fn check_error(&self, response: reqwest::Response) -> Result<reqwest::Response> {
172 let status = response.status();
173 if status.is_success() {
174 return Ok(response);
175 }
176
177 match status.as_u16() {
178 401 => Err(RouchError::Unauthorized),
179 403 => {
180 let body: CouchDbError = response.json().await
181 .unwrap_or(CouchDbError { error: "forbidden".into(), reason: "access denied".into() });
182 Err(RouchError::Forbidden(body.reason))
183 }
184 404 => {
185 let body: CouchDbError = response.json().await
186 .unwrap_or(CouchDbError { error: "not_found".into(), reason: "missing".into() });
187 Err(RouchError::NotFound(body.reason))
188 }
189 409 => Err(RouchError::Conflict),
190 _ => {
191 let body = response.text().await.unwrap_or_default();
192 Err(RouchError::DatabaseError(format!("HTTP {}: {}", status, body)))
193 }
194 }
195 }
196}
197
198fn parse_seq(value: &serde_json::Value) -> Seq {
200 match value {
201 serde_json::Value::Number(n) => Seq::Num(n.as_u64().unwrap_or(0)),
202 serde_json::Value::String(s) => {
203 if let Ok(n) = s.parse::<u64>() {
204 Seq::Num(n)
205 } else {
206 Seq::Str(s.clone())
207 }
208 }
209 _ => Seq::Num(0),
210 }
211}
212
213#[async_trait]
214impl Adapter for HttpAdapter {
215 async fn info(&self) -> Result<DbInfo> {
216 let resp = self.client.get(&self.base_url).send().await
217 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
218 let resp = self.check_error(resp).await?;
219 let info: CouchDbInfo = resp.json().await
220 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
221
222 Ok(DbInfo {
223 db_name: info.db_name,
224 doc_count: info.doc_count,
225 update_seq: parse_seq(&info.update_seq),
226 })
227 }
228
229 async fn get(&self, id: &str, opts: GetOptions) -> Result<Document> {
230 let mut url = self.url(&urlencoded(id));
231 let mut params = Vec::new();
232
233 if let Some(ref rev) = opts.rev {
234 params.push(format!("rev={}", rev));
235 }
236 if opts.conflicts {
237 params.push("conflicts=true".into());
238 }
239 if opts.revs {
240 params.push("revs=true".into());
241 }
242 if let Some(ref open_revs) = opts.open_revs {
243 match open_revs {
244 OpenRevs::All => params.push("open_revs=all".into()),
245 OpenRevs::Specific(revs) => {
246 let json = serde_json::to_string(revs).unwrap_or_default();
247 params.push(format!("open_revs={}", json));
248 }
249 }
250 }
251
252 if !params.is_empty() {
253 url = format!("{}?{}", url, params.join("&"));
254 }
255
256 let resp = self.client.get(&url).send().await
257 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
258 let resp = self.check_error(resp).await?;
259 let json: serde_json::Value = resp.json().await
260 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
261
262 Document::from_json(json)
263 }
264
265 async fn bulk_docs(
266 &self,
267 docs: Vec<Document>,
268 opts: BulkDocsOptions,
269 ) -> Result<Vec<DocResult>> {
270 let json_docs: Vec<serde_json::Value> = docs.iter().map(|d| d.to_json()).collect();
271
272 let request = CouchDbBulkDocsRequest {
273 docs: json_docs,
274 new_edits: if opts.new_edits { None } else { Some(false) },
275 };
276
277 let resp = self.client
278 .post(&self.url("_bulk_docs"))
279 .json(&request)
280 .send()
281 .await
282 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
283 let resp = self.check_error(resp).await?;
284
285 let results: Vec<CouchDbBulkDocsResult> = resp.json().await
286 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
287
288 Ok(results
289 .into_iter()
290 .map(|r| DocResult {
291 ok: r.ok.unwrap_or(r.error.is_none()),
292 id: r.id.unwrap_or_default(),
293 rev: r.rev,
294 error: r.error,
295 reason: r.reason,
296 })
297 .collect())
298 }
299
300 async fn all_docs(&self, opts: AllDocsOptions) -> Result<AllDocsResponse> {
301 let mut params = Vec::new();
302 if opts.include_docs {
303 params.push("include_docs=true".into());
304 }
305 if opts.descending {
306 params.push("descending=true".into());
307 }
308 if let Some(ref start) = opts.start_key {
309 params.push(format!("startkey=\"{}\"", start));
310 }
311 if let Some(ref end) = opts.end_key {
312 params.push(format!("endkey=\"{}\"", end));
313 }
314 if let Some(limit) = opts.limit {
315 params.push(format!("limit={}", limit));
316 }
317 if opts.skip > 0 {
318 params.push(format!("skip={}", opts.skip));
319 }
320
321 let mut url = self.url("_all_docs");
322 if !params.is_empty() {
323 url = format!("{}?{}", url, params.join("&"));
324 }
325
326 let resp = self.client.get(&url).send().await
327 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
328 let resp = self.check_error(resp).await?;
329 let result: CouchDbAllDocsResponse = resp.json().await
330 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
331
332 Ok(AllDocsResponse {
333 total_rows: result.total_rows,
334 offset: result.offset,
335 rows: result
336 .rows
337 .into_iter()
338 .map(|r| AllDocsRow {
339 id: r.id,
340 key: r.key,
341 value: AllDocsRowValue {
342 rev: r.value.rev,
343 deleted: r.value.deleted,
344 },
345 doc: r.doc,
346 })
347 .collect(),
348 })
349 }
350
351 async fn changes(&self, opts: ChangesOptions) -> Result<ChangesResponse> {
352 let mut params = vec![format!("since={}", opts.since.to_query_string())];
353 if opts.include_docs {
354 params.push("include_docs=true".into());
355 }
356 if opts.descending {
357 params.push("descending=true".into());
358 }
359 if let Some(limit) = opts.limit {
360 params.push(format!("limit={}", limit));
361 }
362 if let Some(ref _doc_ids) = opts.doc_ids {
363 params.push("filter=_doc_ids".into());
364 }
366
367 let url = format!("{}?{}", self.url("_changes"), params.join("&"));
368
369 let resp = if opts.doc_ids.is_some() {
370 let body = serde_json::json!({
371 "doc_ids": opts.doc_ids.unwrap()
372 });
373 self.client.post(&url).json(&body).send().await
374 .map_err(|e| RouchError::DatabaseError(e.to_string()))?
375 } else {
376 self.client.get(&url).send().await
377 .map_err(|e| RouchError::DatabaseError(e.to_string()))?
378 };
379
380 let resp = self.check_error(resp).await?;
381 let result: CouchDbChangesResponse = resp.json().await
382 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
383
384 Ok(ChangesResponse {
385 last_seq: parse_seq(&result.last_seq),
386 results: result
387 .results
388 .into_iter()
389 .map(|r| ChangeEvent {
390 seq: parse_seq(&r.seq),
391 id: r.id,
392 changes: r.changes.into_iter().map(|c| ChangeRev { rev: c.rev }).collect(),
393 deleted: r.deleted,
394 doc: r.doc,
395 })
396 .collect(),
397 })
398 }
399
400 async fn revs_diff(
401 &self,
402 revs: HashMap<String, Vec<String>>,
403 ) -> Result<RevsDiffResponse> {
404 let resp = self.client
405 .post(&self.url("_revs_diff"))
406 .json(&revs)
407 .send()
408 .await
409 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
410 let resp = self.check_error(resp).await?;
411
412 let results: HashMap<String, RevsDiffResult> = resp.json().await
413 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
414
415 Ok(RevsDiffResponse { results })
416 }
417
418 async fn bulk_get(&self, docs: Vec<BulkGetItem>) -> Result<BulkGetResponse> {
419 let request = CouchDbBulkGetRequest {
420 docs: docs
421 .into_iter()
422 .map(|d| CouchDbBulkGetDoc {
423 id: d.id,
424 rev: d.rev,
425 })
426 .collect(),
427 };
428
429 let resp = self.client
430 .post(&self.url("_bulk_get?revs=true"))
431 .json(&request)
432 .send()
433 .await
434 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
435 let resp = self.check_error(resp).await?;
436
437 let result: CouchDbBulkGetResponse = resp.json().await
438 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
439
440 Ok(BulkGetResponse {
441 results: result
442 .results
443 .into_iter()
444 .map(|r| BulkGetResult {
445 id: r.id,
446 docs: r.docs
447 .into_iter()
448 .map(|d| BulkGetDoc {
449 ok: d.ok,
450 error: d.error.map(|e| BulkGetError {
451 id: e.id,
452 rev: e.rev,
453 error: e.error,
454 reason: e.reason,
455 }),
456 })
457 .collect(),
458 })
459 .collect(),
460 })
461 }
462
463 async fn put_attachment(
464 &self,
465 doc_id: &str,
466 att_id: &str,
467 rev: &str,
468 data: Vec<u8>,
469 content_type: &str,
470 ) -> Result<DocResult> {
471 let url = format!("{}/{}?rev={}", self.url(&urlencoded(doc_id)), urlencoded(att_id), rev);
472
473 let resp = self.client
474 .put(&url)
475 .header("Content-Type", content_type)
476 .body(data)
477 .send()
478 .await
479 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
480 let resp = self.check_error(resp).await?;
481 let result: CouchDbPutResponse = resp.json().await
482 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
483
484 Ok(DocResult {
485 ok: result.ok.unwrap_or(true),
486 id: result.id,
487 rev: Some(result.rev),
488 error: None,
489 reason: None,
490 })
491 }
492
493 async fn get_attachment(
494 &self,
495 doc_id: &str,
496 att_id: &str,
497 opts: GetAttachmentOptions,
498 ) -> Result<Vec<u8>> {
499 let mut url = format!("{}/{}", self.url(&urlencoded(doc_id)), urlencoded(att_id));
500 if let Some(ref rev) = opts.rev {
501 url = format!("{}?rev={}", url, rev);
502 }
503
504 let resp = self.client.get(&url).send().await
505 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
506 let resp = self.check_error(resp).await?;
507 let bytes = resp.bytes().await
508 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
509
510 Ok(bytes.to_vec())
511 }
512
513 async fn get_local(&self, id: &str) -> Result<serde_json::Value> {
514 let url = self.url(&format!("_local/{}", urlencoded(id)));
515 let resp = self.client.get(&url).send().await
516 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
517 let resp = self.check_error(resp).await?;
518 let json: serde_json::Value = resp.json().await
519 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
520 Ok(json)
521 }
522
523 async fn put_local(&self, id: &str, doc: serde_json::Value) -> Result<()> {
524 let url = self.url(&format!("_local/{}", urlencoded(id)));
525 let resp = self.client.put(&url).json(&doc).send().await
526 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
527 self.check_error(resp).await?;
528 Ok(())
529 }
530
531 async fn remove_local(&self, id: &str) -> Result<()> {
532 let doc = self.get_local(id).await?;
534 let rev = doc["_rev"].as_str().unwrap_or("");
535 let url = format!("{}?rev={}", self.url(&format!("_local/{}", urlencoded(id))), rev);
536 let resp = self.client.delete(&url).send().await
537 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
538 self.check_error(resp).await?;
539 Ok(())
540 }
541
542 async fn compact(&self) -> Result<()> {
543 let resp = self.client
544 .post(&self.url("_compact"))
545 .header("Content-Type", "application/json")
546 .send()
547 .await
548 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
549 self.check_error(resp).await?;
550 Ok(())
551 }
552
553 async fn destroy(&self) -> Result<()> {
554 let resp = self.client.delete(&self.base_url).send().await
555 .map_err(|e| RouchError::DatabaseError(e.to_string()))?;
556 self.check_error(resp).await?;
557 Ok(())
558 }
559}
560
561fn urlencoded(s: &str) -> String {
563 s.replace('%', "%25")
565 .replace(' ', "%20")
566 .replace('/', "%2F")
567 .replace('?', "%3F")
568 .replace('#', "%23")
569 .replace('+', "%2B")
570}