Skip to main content

data_connector/
factory.rs

1//! Factory function to create storage backends based on configuration.
2
3use std::sync::Arc;
4
5use tracing::info;
6use url::Url;
7
8use crate::{
9    config::{HistoryBackend, OracleConfig, PostgresConfig, RedisConfig},
10    core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
11    hooked::{HookedConversationItemStorage, HookedConversationStorage, HookedResponseStorage},
12    hooks::StorageHook,
13    memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
14    noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
15    oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
16    postgres::{
17        PostgresConversationItemStorage, PostgresConversationStorage, PostgresResponseStorage,
18        PostgresStore,
19    },
20    redis::{
21        RedisConversationItemStorage, RedisConversationStorage, RedisResponseStorage, RedisStore,
22    },
23};
24
25/// Complete storage handles returned by the factory.
26pub struct StorageBundle {
27    pub response_storage: Arc<dyn ResponseStorage>,
28    pub conversation_storage: Arc<dyn ConversationStorage>,
29    pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
30}
31
32/// Configuration for creating storage backends
33pub struct StorageFactoryConfig<'a> {
34    pub backend: &'a HistoryBackend,
35    pub oracle: Option<&'a OracleConfig>,
36    pub postgres: Option<&'a PostgresConfig>,
37    pub redis: Option<&'a RedisConfig>,
38    /// Optional storage hook. When provided, all three storage backends are
39    /// wrapped in `Hooked*Storage` that runs before/after hooks around every
40    /// storage operation.
41    pub hook: Option<Arc<dyn StorageHook>>,
42}
43
44/// Create all configured storage handles.
45///
46/// # Errors
47/// Returns error string if required configuration is missing or initialization fails
48pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageBundle, String> {
49    let bundle = match config.backend {
50        HistoryBackend::Memory => {
51            info!("Initializing data connector: Memory");
52            StorageBundle {
53                response_storage: Arc::new(MemoryResponseStorage::new()),
54                conversation_storage: Arc::new(MemoryConversationStorage::new()),
55                conversation_item_storage: Arc::new(MemoryConversationItemStorage::new()),
56            }
57        }
58        HistoryBackend::None => {
59            info!("Initializing data connector: None (no persistence)");
60            StorageBundle {
61                response_storage: Arc::new(NoOpResponseStorage::new()),
62                conversation_storage: Arc::new(NoOpConversationStorage::new()),
63                conversation_item_storage: Arc::new(NoOpConversationItemStorage::new()),
64            }
65        }
66        HistoryBackend::Oracle => {
67            let oracle_cfg = config
68                .oracle
69                .ok_or("oracle configuration is required when history_backend=oracle")?;
70
71            info!(
72                "Initializing data connector: Oracle ATP (pool: {}-{})",
73                oracle_cfg.pool_min, oracle_cfg.pool_max
74            );
75
76            let storages = create_oracle_storage(oracle_cfg)?;
77
78            info!("Data connector initialized successfully: Oracle ATP");
79            storages
80        }
81        HistoryBackend::Postgres => {
82            let postgres_cfg = config
83                .postgres
84                .ok_or("Postgres configuration is required when history_backend=postgres")?;
85
86            let log_db_url = match Url::parse(&postgres_cfg.db_url) {
87                Ok(mut url) => {
88                    if url.password().is_some() {
89                        let _ = url.set_password(Some("****"));
90                    }
91                    url.to_string()
92                }
93                Err(_) => "<redacted>".to_string(),
94            };
95
96            info!(
97                "Initializing data connector: Postgres (db_url: {}, pool_max: {})",
98                log_db_url, postgres_cfg.pool_max
99            );
100
101            let storages = create_postgres_storage(postgres_cfg).await?;
102
103            info!("Data connector initialized successfully: Postgres");
104            storages
105        }
106        HistoryBackend::Redis => {
107            let redis_cfg = config
108                .redis
109                .ok_or("Redis configuration is required when history_backend=redis")?;
110
111            let log_redis_url = match Url::parse(&redis_cfg.url) {
112                Ok(mut url) => {
113                    if url.password().is_some() {
114                        let _ = url.set_password(Some("****"));
115                    }
116                    url.to_string()
117                }
118                Err(_) => "<redacted>".to_string(),
119            };
120
121            info!(
122                "Initializing data connector: Redis (url: {}, pool_max: {})",
123                log_redis_url, redis_cfg.pool_max
124            );
125
126            let storages = create_redis_storage(redis_cfg)?;
127
128            info!("Data connector initialized successfully: Redis");
129            storages
130        }
131    };
132
133    // Wrap backends in hooked storage when a hook is provided.
134    if let Some(hook) = config.hook {
135        info!("Wrapping storage backends with hook");
136        Ok(StorageBundle {
137            response_storage: Arc::new(HookedResponseStorage::new(
138                bundle.response_storage,
139                hook.clone(),
140            )),
141            conversation_storage: Arc::new(HookedConversationStorage::new(
142                bundle.conversation_storage,
143                hook.clone(),
144            )),
145            conversation_item_storage: Arc::new(HookedConversationItemStorage::new(
146                bundle.conversation_item_storage,
147                hook,
148            )),
149        })
150    } else {
151        Ok(bundle)
152    }
153}
154
155/// Create Oracle storage backends with a single shared connection pool.
156fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageBundle, String> {
157    use crate::oracle::OracleStore;
158
159    let store = OracleStore::new(
160        oracle_cfg,
161        &[
162            OracleConversationStorage::init_schema,
163            OracleConversationItemStorage::init_schema,
164            OracleResponseStorage::init_schema,
165        ],
166    )?;
167
168    Ok(StorageBundle {
169        response_storage: Arc::new(OracleResponseStorage::new(store.clone())),
170        conversation_storage: Arc::new(OracleConversationStorage::new(store.clone())),
171        conversation_item_storage: Arc::new(OracleConversationItemStorage::new(store)),
172    })
173}
174
175async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageBundle, String> {
176    let store = PostgresStore::new(postgres_cfg.clone())?;
177    let postgres_resp = PostgresResponseStorage::new(store.clone())
178        .await
179        .map_err(|err| format!("failed to initialize Postgres response storage: {err}"))?;
180    let postgres_conv = PostgresConversationStorage::new(store.clone())
181        .await
182        .map_err(|err| format!("failed to initialize Postgres conversation storage: {err}"))?;
183    let postgres_item = PostgresConversationItemStorage::new(store.clone())
184        .await
185        .map_err(|err| format!("failed to initialize Postgres conversation item storage: {err}"))?;
186
187    // Run versioned migrations after all tables are created
188    let applied = store.run_migrations().await?;
189
190    // Re-create indexes that were deferred during init because
191    // migration-added columns did not yet exist.
192    if !applied.is_empty() {
193        store.ensure_response_indexes().await?;
194    }
195
196    Ok(StorageBundle {
197        response_storage: Arc::new(postgres_resp),
198        conversation_storage: Arc::new(postgres_conv),
199        conversation_item_storage: Arc::new(postgres_item),
200    })
201}
202
203fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageBundle, String> {
204    let store = RedisStore::new(redis_cfg.clone())?;
205    let redis_resp = RedisResponseStorage::new(store.clone());
206    let redis_conv = RedisConversationStorage::new(store.clone());
207    let redis_item = RedisConversationItemStorage::new(store);
208
209    Ok(StorageBundle {
210        response_storage: Arc::new(redis_resp),
211        conversation_storage: Arc::new(redis_conv),
212        conversation_item_storage: Arc::new(redis_item),
213    })
214}
215
216#[cfg(test)]
217mod tests {
218    use serde_json::json;
219
220    use super::*;
221    use crate::core::{NewConversation, NewConversationItem, StoredResponse};
222
223    #[tokio::test]
224    async fn test_create_storage_memory() {
225        let config = StorageFactoryConfig {
226            backend: &HistoryBackend::Memory,
227            oracle: None,
228            postgres: None,
229            redis: None,
230            hook: None,
231        };
232        let bundle = create_storage(config).await.unwrap();
233        let (resp, conv, items) = (
234            bundle.response_storage,
235            bundle.conversation_storage,
236            bundle.conversation_item_storage,
237        );
238
239        // Verify they work end-to-end
240        let mut response = StoredResponse::new(None);
241        response.input = json!("hello");
242        let id = resp.store_response(response).await.unwrap();
243        assert!(resp.get_response(&id).await.unwrap().is_some());
244
245        let conversation = conv
246            .create_conversation(NewConversation {
247                id: None,
248                metadata: None,
249            })
250            .await
251            .unwrap();
252        assert!(conv
253            .get_conversation(&conversation.id)
254            .await
255            .unwrap()
256            .is_some());
257
258        let item = items
259            .create_item(NewConversationItem {
260                id: None,
261                response_id: None,
262                item_type: "message".to_string(),
263                role: Some("user".to_string()),
264                content: json!([]),
265                status: Some("completed".to_string()),
266            })
267            .await
268            .unwrap();
269        assert!(items.get_item(&item.id).await.unwrap().is_some());
270    }
271
272    #[tokio::test]
273    async fn test_create_storage_none() {
274        let config = StorageFactoryConfig {
275            backend: &HistoryBackend::None,
276            oracle: None,
277            postgres: None,
278            redis: None,
279            hook: None,
280        };
281        let bundle = create_storage(config).await.unwrap();
282        let (resp, conv) = (bundle.response_storage, bundle.conversation_storage);
283
284        // NoOp storage should accept writes but return nothing on reads
285        let mut response = StoredResponse::new(None);
286        response.input = json!("hello");
287        let id = resp.store_response(response).await.unwrap();
288        assert!(resp.get_response(&id).await.unwrap().is_none());
289        assert!(conv
290            .get_conversation(&"nonexistent".into())
291            .await
292            .unwrap()
293            .is_none());
294    }
295
296    #[tokio::test]
297    async fn test_create_storage_oracle_missing_config() {
298        let err = create_storage(StorageFactoryConfig {
299            backend: &HistoryBackend::Oracle,
300            oracle: None,
301            postgres: None,
302            redis: None,
303            hook: None,
304        })
305        .await
306        .err()
307        .expect("should fail");
308        assert!(err.contains("oracle configuration is required"));
309    }
310
311    #[tokio::test]
312    async fn test_create_storage_postgres_missing_config() {
313        let err = create_storage(StorageFactoryConfig {
314            backend: &HistoryBackend::Postgres,
315            oracle: None,
316            postgres: None,
317            redis: None,
318            hook: None,
319        })
320        .await
321        .err()
322        .expect("should fail");
323        assert!(err.contains("Postgres configuration is required"));
324    }
325
326    #[tokio::test]
327    async fn test_create_storage_redis_missing_config() {
328        let err = create_storage(StorageFactoryConfig {
329            backend: &HistoryBackend::Redis,
330            oracle: None,
331            postgres: None,
332            redis: None,
333            hook: None,
334        })
335        .await
336        .err()
337        .expect("should fail");
338        assert!(err.contains("Redis configuration is required"));
339    }
340
341    #[tokio::test]
342    async fn test_create_storage_with_hook() {
343        use std::sync::Arc;
344
345        use async_trait::async_trait;
346
347        use crate::{
348            context::RequestContext,
349            hooks::{BeforeHookResult, ExtraColumns, HookError, StorageHook, StorageOperation},
350        };
351
352        struct NoOpHook;
353
354        #[async_trait]
355        impl StorageHook for NoOpHook {
356            async fn before(
357                &self,
358                _op: StorageOperation,
359                _ctx: Option<&RequestContext>,
360                _payload: &serde_json::Value,
361            ) -> Result<BeforeHookResult, HookError> {
362                Ok(BeforeHookResult::default())
363            }
364
365            async fn after(
366                &self,
367                _op: StorageOperation,
368                _ctx: Option<&RequestContext>,
369                _payload: &serde_json::Value,
370                _result: &serde_json::Value,
371                extra: &ExtraColumns,
372            ) -> Result<ExtraColumns, HookError> {
373                Ok(extra.clone())
374            }
375        }
376
377        let config = StorageFactoryConfig {
378            backend: &HistoryBackend::Memory,
379            oracle: None,
380            postgres: None,
381            redis: None,
382            hook: Some(Arc::new(NoOpHook)),
383        };
384        let bundle = create_storage(config).await.unwrap();
385        let (resp, conv, items) = (
386            bundle.response_storage,
387            bundle.conversation_storage,
388            bundle.conversation_item_storage,
389        );
390
391        // Verify hooked storage works end-to-end
392        let mut response = StoredResponse::new(None);
393        response.input = json!("hello");
394        let id = resp.store_response(response).await.unwrap();
395        assert!(resp.get_response(&id).await.unwrap().is_some());
396
397        let conversation = conv
398            .create_conversation(NewConversation {
399                id: None,
400                metadata: None,
401            })
402            .await
403            .unwrap();
404        assert!(conv
405            .get_conversation(&conversation.id)
406            .await
407            .unwrap()
408            .is_some());
409
410        let item = items
411            .create_item(NewConversationItem {
412                id: None,
413                response_id: None,
414                item_type: "message".to_string(),
415                role: Some("user".to_string()),
416                content: json!([]),
417                status: Some("completed".to_string()),
418            })
419            .await
420            .unwrap();
421        assert!(items.get_item(&item.id).await.unwrap().is_some());
422    }
423}