Skip to main content

data_connector/
factory.rs

1//! Factory function to create storage backends based on configuration.
2
3use std::sync::Arc;
4
5use tracing::info;
6use url::Url;
7
8use crate::{
9    config::{HistoryBackend, OracleConfig, PostgresConfig, RedisConfig},
10    core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
11    hooked::{HookedConversationItemStorage, HookedConversationStorage, HookedResponseStorage},
12    hooks::StorageHook,
13    memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
14    noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
15    oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
16    postgres::{
17        PostgresConversationItemStorage, PostgresConversationStorage, PostgresResponseStorage,
18        PostgresStore,
19    },
20    redis::{
21        RedisConversationItemStorage, RedisConversationStorage, RedisResponseStorage, RedisStore,
22    },
23};
24
25/// Type alias for the storage tuple returned by factory functions.
26/// This avoids clippy::type_complexity warnings while keeping Arc explicit.
27pub type StorageTuple = (
28    Arc<dyn ResponseStorage>,
29    Arc<dyn ConversationStorage>,
30    Arc<dyn ConversationItemStorage>,
31);
32
33/// Configuration for creating storage backends
34pub struct StorageFactoryConfig<'a> {
35    pub backend: &'a HistoryBackend,
36    pub oracle: Option<&'a OracleConfig>,
37    pub postgres: Option<&'a PostgresConfig>,
38    pub redis: Option<&'a RedisConfig>,
39    /// Optional storage hook. When provided, all three storage backends are
40    /// wrapped in `Hooked*Storage` that runs before/after hooks around every
41    /// storage operation.
42    pub hook: Option<Arc<dyn StorageHook>>,
43}
44
45/// Create all three storage backends based on configuration.
46///
47/// # Arguments
48/// * `config` - Storage factory configuration
49///
50/// # Returns
51/// Tuple of (response_storage, conversation_storage, conversation_item_storage)
52///
53/// # Errors
54/// Returns error string if required configuration is missing or initialization fails
55pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageTuple, String> {
56    let (resp, conv, items): StorageTuple = match config.backend {
57        HistoryBackend::Memory => {
58            info!("Initializing data connector: Memory");
59            (
60                Arc::new(MemoryResponseStorage::new()),
61                Arc::new(MemoryConversationStorage::new()),
62                Arc::new(MemoryConversationItemStorage::new()),
63            )
64        }
65        HistoryBackend::None => {
66            info!("Initializing data connector: None (no persistence)");
67            (
68                Arc::new(NoOpResponseStorage::new()),
69                Arc::new(NoOpConversationStorage::new()),
70                Arc::new(NoOpConversationItemStorage::new()),
71            )
72        }
73        HistoryBackend::Oracle => {
74            let oracle_cfg = config
75                .oracle
76                .ok_or("oracle configuration is required when history_backend=oracle")?;
77
78            info!(
79                "Initializing data connector: Oracle ATP (pool: {}-{})",
80                oracle_cfg.pool_min, oracle_cfg.pool_max
81            );
82
83            let storages = create_oracle_storage(oracle_cfg)?;
84
85            info!("Data connector initialized successfully: Oracle ATP");
86            storages
87        }
88        HistoryBackend::Postgres => {
89            let postgres_cfg = config
90                .postgres
91                .ok_or("Postgres configuration is required when history_backend=postgres")?;
92
93            let log_db_url = match Url::parse(&postgres_cfg.db_url) {
94                Ok(mut url) => {
95                    if url.password().is_some() {
96                        let _ = url.set_password(Some("****"));
97                    }
98                    url.to_string()
99                }
100                Err(_) => "<redacted>".to_string(),
101            };
102
103            info!(
104                "Initializing data connector: Postgres (db_url: {}, pool_max: {})",
105                log_db_url, postgres_cfg.pool_max
106            );
107
108            let storages = create_postgres_storage(postgres_cfg).await?;
109
110            info!("Data connector initialized successfully: Postgres");
111            storages
112        }
113        HistoryBackend::Redis => {
114            let redis_cfg = config
115                .redis
116                .ok_or("Redis configuration is required when history_backend=redis")?;
117
118            let log_redis_url = match Url::parse(&redis_cfg.url) {
119                Ok(mut url) => {
120                    if url.password().is_some() {
121                        let _ = url.set_password(Some("****"));
122                    }
123                    url.to_string()
124                }
125                Err(_) => "<redacted>".to_string(),
126            };
127
128            info!(
129                "Initializing data connector: Redis (url: {}, pool_max: {})",
130                log_redis_url, redis_cfg.pool_max
131            );
132
133            let storages = create_redis_storage(redis_cfg)?;
134
135            info!("Data connector initialized successfully: Redis");
136            storages
137        }
138    };
139
140    // Wrap backends in hooked storage when a hook is provided
141    if let Some(hook) = config.hook {
142        info!("Wrapping storage backends with hook");
143        Ok((
144            Arc::new(HookedResponseStorage::new(resp, hook.clone())),
145            Arc::new(HookedConversationStorage::new(conv, hook.clone())),
146            Arc::new(HookedConversationItemStorage::new(items, hook)),
147        ))
148    } else {
149        Ok((resp, conv, items))
150    }
151}
152
153/// Create Oracle storage backends with a single shared connection pool.
154fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageTuple, String> {
155    use crate::oracle::OracleStore;
156
157    let store = OracleStore::new(
158        oracle_cfg,
159        &[
160            OracleConversationStorage::init_schema,
161            OracleConversationItemStorage::init_schema,
162            OracleResponseStorage::init_schema,
163        ],
164    )?;
165
166    Ok((
167        Arc::new(OracleResponseStorage::new(store.clone())),
168        Arc::new(OracleConversationStorage::new(store.clone())),
169        Arc::new(OracleConversationItemStorage::new(store)),
170    ))
171}
172
173async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageTuple, String> {
174    let store = PostgresStore::new(postgres_cfg.clone())?;
175    let postgres_resp = PostgresResponseStorage::new(store.clone())
176        .await
177        .map_err(|err| format!("failed to initialize Postgres response storage: {err}"))?;
178    let postgres_conv = PostgresConversationStorage::new(store.clone())
179        .await
180        .map_err(|err| format!("failed to initialize Postgres conversation storage: {err}"))?;
181    let postgres_item = PostgresConversationItemStorage::new(store.clone())
182        .await
183        .map_err(|err| format!("failed to initialize Postgres conversation item storage: {err}"))?;
184
185    // Run versioned migrations after all tables are created
186    let applied = store.run_migrations().await?;
187
188    // Re-create indexes that were deferred during init because
189    // migration-added columns did not yet exist.
190    if !applied.is_empty() {
191        store.ensure_response_indexes().await?;
192    }
193
194    Ok((
195        Arc::new(postgres_resp),
196        Arc::new(postgres_conv),
197        Arc::new(postgres_item),
198    ))
199}
200
201fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageTuple, String> {
202    let store = RedisStore::new(redis_cfg.clone())?;
203    let redis_resp = RedisResponseStorage::new(store.clone());
204    let redis_conv = RedisConversationStorage::new(store.clone());
205    let redis_item = RedisConversationItemStorage::new(store);
206
207    Ok((
208        Arc::new(redis_resp),
209        Arc::new(redis_conv),
210        Arc::new(redis_item),
211    ))
212}
213
214#[cfg(test)]
215mod tests {
216    use serde_json::json;
217
218    use super::*;
219    use crate::core::{NewConversation, NewConversationItem, StoredResponse};
220
221    #[tokio::test]
222    async fn test_create_storage_memory() {
223        let config = StorageFactoryConfig {
224            backend: &HistoryBackend::Memory,
225            oracle: None,
226            postgres: None,
227            redis: None,
228            hook: None,
229        };
230        let (resp, conv, items) = create_storage(config).await.unwrap();
231
232        // Verify they work end-to-end
233        let mut response = StoredResponse::new(None);
234        response.input = json!("hello");
235        let id = resp.store_response(response).await.unwrap();
236        assert!(resp.get_response(&id).await.unwrap().is_some());
237
238        let conversation = conv
239            .create_conversation(NewConversation {
240                id: None,
241                metadata: None,
242            })
243            .await
244            .unwrap();
245        assert!(conv
246            .get_conversation(&conversation.id)
247            .await
248            .unwrap()
249            .is_some());
250
251        let item = items
252            .create_item(NewConversationItem {
253                id: None,
254                response_id: None,
255                item_type: "message".to_string(),
256                role: Some("user".to_string()),
257                content: json!([]),
258                status: Some("completed".to_string()),
259            })
260            .await
261            .unwrap();
262        assert!(items.get_item(&item.id).await.unwrap().is_some());
263    }
264
265    #[tokio::test]
266    async fn test_create_storage_none() {
267        let config = StorageFactoryConfig {
268            backend: &HistoryBackend::None,
269            oracle: None,
270            postgres: None,
271            redis: None,
272            hook: None,
273        };
274        let (resp, conv, _items) = create_storage(config).await.unwrap();
275
276        // NoOp storage should accept writes but return nothing on reads
277        let mut response = StoredResponse::new(None);
278        response.input = json!("hello");
279        let id = resp.store_response(response).await.unwrap();
280        assert!(resp.get_response(&id).await.unwrap().is_none());
281        assert!(conv
282            .get_conversation(&"nonexistent".into())
283            .await
284            .unwrap()
285            .is_none());
286    }
287
288    #[tokio::test]
289    async fn test_create_storage_oracle_missing_config() {
290        let config = StorageFactoryConfig {
291            backend: &HistoryBackend::Oracle,
292            oracle: None,
293            postgres: None,
294            redis: None,
295            hook: None,
296        };
297        let err = create_storage(config).await.err().expect("should fail");
298        assert!(err.contains("oracle configuration is required"));
299    }
300
301    #[tokio::test]
302    async fn test_create_storage_postgres_missing_config() {
303        let config = StorageFactoryConfig {
304            backend: &HistoryBackend::Postgres,
305            oracle: None,
306            postgres: None,
307            redis: None,
308            hook: None,
309        };
310        let err = create_storage(config).await.err().expect("should fail");
311        assert!(err.contains("Postgres configuration is required"));
312    }
313
314    #[tokio::test]
315    async fn test_create_storage_redis_missing_config() {
316        let config = StorageFactoryConfig {
317            backend: &HistoryBackend::Redis,
318            oracle: None,
319            postgres: None,
320            redis: None,
321            hook: None,
322        };
323        let err = create_storage(config).await.err().expect("should fail");
324        assert!(err.contains("Redis configuration is required"));
325    }
326
327    #[tokio::test]
328    async fn test_create_storage_with_hook() {
329        use std::sync::Arc;
330
331        use async_trait::async_trait;
332
333        use crate::{
334            context::RequestContext,
335            hooks::{BeforeHookResult, ExtraColumns, HookError, StorageHook, StorageOperation},
336        };
337
338        struct NoOpHook;
339
340        #[async_trait]
341        impl StorageHook for NoOpHook {
342            async fn before(
343                &self,
344                _op: StorageOperation,
345                _ctx: Option<&RequestContext>,
346                _payload: &serde_json::Value,
347            ) -> Result<BeforeHookResult, HookError> {
348                Ok(BeforeHookResult::default())
349            }
350
351            async fn after(
352                &self,
353                _op: StorageOperation,
354                _ctx: Option<&RequestContext>,
355                _payload: &serde_json::Value,
356                _result: &serde_json::Value,
357                extra: &ExtraColumns,
358            ) -> Result<ExtraColumns, HookError> {
359                Ok(extra.clone())
360            }
361        }
362
363        let config = StorageFactoryConfig {
364            backend: &HistoryBackend::Memory,
365            oracle: None,
366            postgres: None,
367            redis: None,
368            hook: Some(Arc::new(NoOpHook)),
369        };
370        let (resp, conv, items) = create_storage(config).await.unwrap();
371
372        // Verify hooked storage works end-to-end
373        let mut response = StoredResponse::new(None);
374        response.input = json!("hello");
375        let id = resp.store_response(response).await.unwrap();
376        assert!(resp.get_response(&id).await.unwrap().is_some());
377
378        let conversation = conv
379            .create_conversation(NewConversation {
380                id: None,
381                metadata: None,
382            })
383            .await
384            .unwrap();
385        assert!(conv
386            .get_conversation(&conversation.id)
387            .await
388            .unwrap()
389            .is_some());
390
391        let item = items
392            .create_item(NewConversationItem {
393                id: None,
394                response_id: None,
395                item_type: "message".to_string(),
396                role: Some("user".to_string()),
397                content: json!([]),
398                status: Some("completed".to_string()),
399            })
400            .await
401            .unwrap();
402        assert!(items.get_item(&item.id).await.unwrap().is_some());
403    }
404}