Skip to main content

rustvello_mongo/
connection.rs

1use mongodb::{Client, Database};
2use tokio::sync::OnceCell;
3
4use rustvello_core::error::{RustvelloError, RustvelloResult};
5
6/// Shared MongoDB connection pool.
7///
8/// Uses `OnceCell` for lazy, one-time async initialization of the
9/// MongoDB `Client`. All backends share the same pool instance via `Arc`.
10///
11/// Data is isolated by `app_id`: the effective database name is
12/// `{db_name}_{app_id}`, so two pools created with different `app_id`
13/// values against the same MongoDB instance will not see each other's data.
14#[non_exhaustive]
15pub struct MongoPool {
16    uri: String,
17    db_name: String,
18    client: OnceCell<Client>,
19}
20
21impl MongoPool {
22    /// Create a new pool targeting the given MongoDB URI and database name.
23    ///
24    /// The `app_id` is appended to `db_name` (as `{db_name}_{app_id}`) so
25    /// that different applications sharing the same MongoDB cluster are
26    /// fully isolated.
27    pub fn new(uri: &str, db_name: &str, app_id: &str) -> Self {
28        Self::with_options(uri, db_name, app_id, None)
29    }
30
31    /// Create a new pool with an explicit maximum pool size.
32    ///
33    /// When `max_pool_size` is `None`, the MongoDB driver's default (100) is used.
34    pub fn with_options(
35        uri: &str,
36        db_name: &str,
37        app_id: &str,
38        max_pool_size: Option<u32>,
39    ) -> Self {
40        let mut uri_string = uri.to_string();
41        if let Some(size) = max_pool_size {
42            // Append maxPoolSize as a connection-string option
43            let sep = if uri_string.contains('?') { '&' } else { '?' };
44            uri_string = format!("{uri_string}{sep}maxPoolSize={size}");
45        }
46        let effective_db_name = format!("{db_name}_{app_id}");
47        Self {
48            uri: uri_string,
49            db_name: effective_db_name,
50            client: OnceCell::new(),
51        }
52    }
53
54    /// Get (or lazily create) a handle to the configured database.
55    ///
56    /// On first access this also creates recommended indexes. Missing
57    /// indexes only affect performance, so creation failures are logged
58    /// but do not prevent startup.
59    pub async fn db(&self) -> RustvelloResult<Database> {
60        let client = self
61            .client
62            .get_or_try_init(|| async {
63                let c = Client::with_uri_str(&self.uri).await.map_err(|e| {
64                    RustvelloError::state_backend(format!("MongoDB connect: {}", e))
65                })?;
66                // Best-effort index creation on first connect
67                if let Err(e) = ensure_indexes(&c.database(&self.db_name)).await {
68                    tracing::warn!("MongoDB index creation failed (non-fatal): {e}");
69                }
70                Ok(c)
71            })
72            .await?;
73        Ok(client.database(&self.db_name))
74    }
75}
76
77/// Create indexes that are important for query performance.
78async fn ensure_indexes(db: &Database) -> Result<(), mongodb::error::Error> {
79    use mongodb::IndexModel;
80
81    let status_col = db.collection::<mongodb::bson::Document>("orch_status");
82    status_col
83        .create_index(
84            IndexModel::builder()
85                .keys(mongodb::bson::doc! { "task_id": 1 })
86                .build(),
87        )
88        .await?;
89    status_col
90        .create_index(
91            IndexModel::builder()
92                .keys(mongodb::bson::doc! { "call_id": 1 })
93                .build(),
94        )
95        .await?;
96    status_col
97        .create_index(
98            IndexModel::builder()
99                .keys(mongodb::bson::doc! { "status_name": 1 })
100                .build(),
101        )
102        .await?;
103
104    let cond_col = db.collection::<mongodb::bson::Document>("trg_conditions");
105    cond_col
106        .create_index(
107            IndexModel::builder()
108                .keys(mongodb::bson::doc! { "condition_type": 1 })
109                .build(),
110        )
111        .await?;
112    cond_col
113        .create_index(
114            IndexModel::builder()
115                .keys(mongodb::bson::doc! { "event_code": 1 })
116                .build(),
117        )
118        .await?;
119    cond_col
120        .create_index(
121            IndexModel::builder()
122                .keys(mongodb::bson::doc! { "task_ids": 1 })
123                .build(),
124        )
125        .await?;
126
127    let trigger_col = db.collection::<mongodb::bson::Document>("trg_definitions");
128    trigger_col
129        .create_index(
130            IndexModel::builder()
131                .keys(mongodb::bson::doc! { "task_id": 1 })
132                .build(),
133        )
134        .await?;
135    trigger_col
136        .create_index(
137            IndexModel::builder()
138                .keys(mongodb::bson::doc! { "condition_ids": 1 })
139                .build(),
140        )
141        .await?;
142
143    // State backend: history indexes for runner and time-range queries
144    let history_col = db.collection::<mongodb::bson::Document>("state_history");
145    history_col
146        .create_index(
147            IndexModel::builder()
148                .keys(mongodb::bson::doc! { "runner_id": 1 })
149                .build(),
150        )
151        .await?;
152    history_col
153        .create_index(
154            IndexModel::builder()
155                .keys(mongodb::bson::doc! { "timestamp": 1 })
156                .build(),
157        )
158        .await?;
159    history_col
160        .create_index(
161            IndexModel::builder()
162                .keys(mongodb::bson::doc! { "invocation_id": 1 })
163                .build(),
164        )
165        .await?;
166
167    // State backend: runner contexts index for parent lookups
168    let runner_col = db.collection::<mongodb::bson::Document>("state_runner_contexts");
169    runner_col
170        .create_index(
171            IndexModel::builder()
172                .keys(mongodb::bson::doc! { "parent_runner_id": 1 })
173                .build(),
174        )
175        .await?;
176
177    // State backend: workflow runs index
178    let wf_runs_col = db.collection::<mongodb::bson::Document>("state_workflow_runs");
179    wf_runs_col
180        .create_index(
181            IndexModel::builder()
182                .keys(mongodb::bson::doc! { "workflow_type": 1 })
183                .build(),
184        )
185        .await?;
186
187    Ok(())
188}
189
190pub(crate) fn mongo_err(e: mongodb::error::Error) -> RustvelloError {
191    RustvelloError::state_backend(format!("MongoDB: {}", e))
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn pool_stores_uri_and_db_name() {
200        let pool = MongoPool::new("mongodb://localhost:27017", "test_db", "my_app");
201        assert_eq!(pool.uri, "mongodb://localhost:27017");
202        assert_eq!(pool.db_name, "test_db_my_app");
203    }
204
205    #[test]
206    fn mongo_err_maps_to_storage() {
207        use mongodb::error::Error as MongoError;
208        let err = MongoError::custom("test error");
209        let mapped = mongo_err(err);
210        assert!(
211            matches!(mapped, RustvelloError::Infrastructure { .. }),
212            "expected Infrastructure, got {:?}",
213            mapped
214        );
215    }
216}