rustvello_mongo/
connection.rs1use mongodb::{Client, Database};
2use tokio::sync::OnceCell;
3
4use rustvello_core::error::{RustvelloError, RustvelloResult};
5
6#[non_exhaustive]
15pub struct MongoPool {
16 uri: String,
17 db_name: String,
18 client: OnceCell<Client>,
19}
20
21impl MongoPool {
22 pub fn new(uri: &str, db_name: &str, app_id: &str) -> Self {
28 Self::with_options(uri, db_name, app_id, None)
29 }
30
31 pub fn with_options(
35 uri: &str,
36 db_name: &str,
37 app_id: &str,
38 max_pool_size: Option<u32>,
39 ) -> Self {
40 let mut uri_string = uri.to_string();
41 if let Some(size) = max_pool_size {
42 let sep = if uri_string.contains('?') { '&' } else { '?' };
44 uri_string = format!("{uri_string}{sep}maxPoolSize={size}");
45 }
46 let effective_db_name = format!("{db_name}_{app_id}");
47 Self {
48 uri: uri_string,
49 db_name: effective_db_name,
50 client: OnceCell::new(),
51 }
52 }
53
54 pub async fn db(&self) -> RustvelloResult<Database> {
60 let client = self
61 .client
62 .get_or_try_init(|| async {
63 let c = Client::with_uri_str(&self.uri).await.map_err(|e| {
64 RustvelloError::state_backend(format!("MongoDB connect: {}", e))
65 })?;
66 if let Err(e) = ensure_indexes(&c.database(&self.db_name)).await {
68 tracing::warn!("MongoDB index creation failed (non-fatal): {e}");
69 }
70 Ok(c)
71 })
72 .await?;
73 Ok(client.database(&self.db_name))
74 }
75}
76
77async fn ensure_indexes(db: &Database) -> Result<(), mongodb::error::Error> {
79 use mongodb::IndexModel;
80
81 let status_col = db.collection::<mongodb::bson::Document>("orch_status");
82 status_col
83 .create_index(
84 IndexModel::builder()
85 .keys(mongodb::bson::doc! { "task_id": 1 })
86 .build(),
87 )
88 .await?;
89 status_col
90 .create_index(
91 IndexModel::builder()
92 .keys(mongodb::bson::doc! { "call_id": 1 })
93 .build(),
94 )
95 .await?;
96 status_col
97 .create_index(
98 IndexModel::builder()
99 .keys(mongodb::bson::doc! { "status_name": 1 })
100 .build(),
101 )
102 .await?;
103
104 let cond_col = db.collection::<mongodb::bson::Document>("trg_conditions");
105 cond_col
106 .create_index(
107 IndexModel::builder()
108 .keys(mongodb::bson::doc! { "condition_type": 1 })
109 .build(),
110 )
111 .await?;
112 cond_col
113 .create_index(
114 IndexModel::builder()
115 .keys(mongodb::bson::doc! { "event_code": 1 })
116 .build(),
117 )
118 .await?;
119 cond_col
120 .create_index(
121 IndexModel::builder()
122 .keys(mongodb::bson::doc! { "task_ids": 1 })
123 .build(),
124 )
125 .await?;
126
127 let trigger_col = db.collection::<mongodb::bson::Document>("trg_definitions");
128 trigger_col
129 .create_index(
130 IndexModel::builder()
131 .keys(mongodb::bson::doc! { "task_id": 1 })
132 .build(),
133 )
134 .await?;
135 trigger_col
136 .create_index(
137 IndexModel::builder()
138 .keys(mongodb::bson::doc! { "condition_ids": 1 })
139 .build(),
140 )
141 .await?;
142
143 let history_col = db.collection::<mongodb::bson::Document>("state_history");
145 history_col
146 .create_index(
147 IndexModel::builder()
148 .keys(mongodb::bson::doc! { "runner_id": 1 })
149 .build(),
150 )
151 .await?;
152 history_col
153 .create_index(
154 IndexModel::builder()
155 .keys(mongodb::bson::doc! { "timestamp": 1 })
156 .build(),
157 )
158 .await?;
159 history_col
160 .create_index(
161 IndexModel::builder()
162 .keys(mongodb::bson::doc! { "invocation_id": 1 })
163 .build(),
164 )
165 .await?;
166
167 let runner_col = db.collection::<mongodb::bson::Document>("state_runner_contexts");
169 runner_col
170 .create_index(
171 IndexModel::builder()
172 .keys(mongodb::bson::doc! { "parent_runner_id": 1 })
173 .build(),
174 )
175 .await?;
176
177 let wf_runs_col = db.collection::<mongodb::bson::Document>("state_workflow_runs");
179 wf_runs_col
180 .create_index(
181 IndexModel::builder()
182 .keys(mongodb::bson::doc! { "workflow_type": 1 })
183 .build(),
184 )
185 .await?;
186
187 Ok(())
188}
189
190pub(crate) fn mongo_err(e: mongodb::error::Error) -> RustvelloError {
191 RustvelloError::state_backend(format!("MongoDB: {}", e))
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn pool_stores_uri_and_db_name() {
200 let pool = MongoPool::new("mongodb://localhost:27017", "test_db", "my_app");
201 assert_eq!(pool.uri, "mongodb://localhost:27017");
202 assert_eq!(pool.db_name, "test_db_my_app");
203 }
204
205 #[test]
206 fn mongo_err_maps_to_storage() {
207 use mongodb::error::Error as MongoError;
208 let err = MongoError::custom("test error");
209 let mapped = mongo_err(err);
210 assert!(
211 matches!(mapped, RustvelloError::Infrastructure { .. }),
212 "expected Infrastructure, got {:?}",
213 mapped
214 );
215 }
216}