1use crate::error::Error;
12use async_trait::async_trait;
13use bb8::ManageConnection;
14use mongodb::{bson::doc, options::ClientOptions, Client, Database};
15
16#[derive(Clone, Debug)]
18pub struct Mongodb {
19 client_options: ClientOptions,
20 db_name: String,
21}
22
23impl Mongodb {
24 pub fn new<T>(client_options: ClientOptions, db_name: T) -> Mongodb
26 where
27 T: Into<String>,
28 {
29 Mongodb {
30 client_options,
31 db_name: db_name.into(),
32 }
33 }
34}
35
36#[async_trait]
37#[allow(unused_qualifications)]
38impl ManageConnection for Mongodb {
39 type Connection = Database;
40 type Error = Error;
41
42 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
43 let client = Client::with_options(self.client_options.clone())?;
44 Ok(client.database(&self.db_name))
45 }
46
47 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
48 let _doc = conn.run_command(doc! { "ping": 1 }).await?;
49 Ok(())
50 }
51
52 fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
53 false
54 }
55}
56
57#[cfg(test)]
58mod test {
59 use super::Mongodb;
60 use anyhow::Result;
61 use bb8::Pool;
62 use futures::pin_mut;
63 use mongodb::{
64 bson::doc,
65 options::{ClientOptions, Credential},
66 };
67 use std::{env, time::Duration};
68 use tokio::time::timeout;
69
70 #[tokio::test]
71 async fn new_works() -> Result<()> {
72 let mut client_options = ClientOptions::parse(env::var("BB8_MONGODB_URL")?).await?;
73 client_options.credential = Some(
74 Credential::builder()
75 .username(env::var("BB8_MONGODB_USER").ok())
76 .password(env::var("BB8_MONGODB_PASSWORD").ok())
77 .build(),
78 );
79
80 let connection_manager = Mongodb::new(client_options, "admin");
82 let pool = Pool::builder().build(connection_manager).await?;
84 let conn = pool.get().await?;
86 assert_eq!(conn.name(), "admin");
87 let doc = conn.run_command(doc! { "ping": 1 }).await?;
89 assert_eq!(doc! { "ok": 1 }, doc);
91 Ok(())
92 }
93
94 #[tokio::test]
95 async fn bad_connect_errors() -> Result<()> {
96 let mut client_options = ClientOptions::parse(env::var("BB8_MONGODB_URL")?).await?;
97 client_options.credential = Some(
98 Credential::builder()
99 .username(env::var("BB8_MONGODB_USER").ok())
100 .password(Some("not a password".to_string()))
101 .build(),
102 );
103 client_options.connect_timeout = Some(Duration::from_secs(3));
104 client_options.server_selection_timeout = Some(Duration::from_secs(3));
105
106 let connection_manager = Mongodb::new(client_options, "admin");
108 let pool = Pool::builder().build(connection_manager).await?;
110 let conn_fut = pool.get();
112 pin_mut!(conn_fut);
113 assert!(timeout(Duration::from_secs(5), &mut conn_fut)
114 .await
115 .is_err());
116 Ok(())
117 }
118}