1use std::sync::Arc;
4
5use tracing::info;
6use url::Url;
7
8use crate::{
9 config::{HistoryBackend, OracleConfig, PostgresConfig, RedisConfig},
10 core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
11 hooked::{HookedConversationItemStorage, HookedConversationStorage, HookedResponseStorage},
12 hooks::StorageHook,
13 memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
14 noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
15 oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
16 postgres::{
17 PostgresConversationItemStorage, PostgresConversationStorage, PostgresResponseStorage,
18 PostgresStore,
19 },
20 redis::{
21 RedisConversationItemStorage, RedisConversationStorage, RedisResponseStorage, RedisStore,
22 },
23};
24
25pub struct StorageBundle {
27 pub response_storage: Arc<dyn ResponseStorage>,
28 pub conversation_storage: Arc<dyn ConversationStorage>,
29 pub conversation_item_storage: Arc<dyn ConversationItemStorage>,
30}
31
32pub struct StorageFactoryConfig<'a> {
34 pub backend: &'a HistoryBackend,
35 pub oracle: Option<&'a OracleConfig>,
36 pub postgres: Option<&'a PostgresConfig>,
37 pub redis: Option<&'a RedisConfig>,
38 pub hook: Option<Arc<dyn StorageHook>>,
42}
43
44pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageBundle, String> {
49 let bundle = match config.backend {
50 HistoryBackend::Memory => {
51 info!("Initializing data connector: Memory");
52 StorageBundle {
53 response_storage: Arc::new(MemoryResponseStorage::new()),
54 conversation_storage: Arc::new(MemoryConversationStorage::new()),
55 conversation_item_storage: Arc::new(MemoryConversationItemStorage::new()),
56 }
57 }
58 HistoryBackend::None => {
59 info!("Initializing data connector: None (no persistence)");
60 StorageBundle {
61 response_storage: Arc::new(NoOpResponseStorage::new()),
62 conversation_storage: Arc::new(NoOpConversationStorage::new()),
63 conversation_item_storage: Arc::new(NoOpConversationItemStorage::new()),
64 }
65 }
66 HistoryBackend::Oracle => {
67 let oracle_cfg = config
68 .oracle
69 .ok_or("oracle configuration is required when history_backend=oracle")?;
70
71 info!(
72 "Initializing data connector: Oracle ATP (pool: {}-{})",
73 oracle_cfg.pool_min, oracle_cfg.pool_max
74 );
75
76 let storages = create_oracle_storage(oracle_cfg)?;
77
78 info!("Data connector initialized successfully: Oracle ATP");
79 storages
80 }
81 HistoryBackend::Postgres => {
82 let postgres_cfg = config
83 .postgres
84 .ok_or("Postgres configuration is required when history_backend=postgres")?;
85
86 let log_db_url = match Url::parse(&postgres_cfg.db_url) {
87 Ok(mut url) => {
88 if url.password().is_some() {
89 let _ = url.set_password(Some("****"));
90 }
91 url.to_string()
92 }
93 Err(_) => "<redacted>".to_string(),
94 };
95
96 info!(
97 "Initializing data connector: Postgres (db_url: {}, pool_max: {})",
98 log_db_url, postgres_cfg.pool_max
99 );
100
101 let storages = create_postgres_storage(postgres_cfg).await?;
102
103 info!("Data connector initialized successfully: Postgres");
104 storages
105 }
106 HistoryBackend::Redis => {
107 let redis_cfg = config
108 .redis
109 .ok_or("Redis configuration is required when history_backend=redis")?;
110
111 let log_redis_url = match Url::parse(&redis_cfg.url) {
112 Ok(mut url) => {
113 if url.password().is_some() {
114 let _ = url.set_password(Some("****"));
115 }
116 url.to_string()
117 }
118 Err(_) => "<redacted>".to_string(),
119 };
120
121 info!(
122 "Initializing data connector: Redis (url: {}, pool_max: {})",
123 log_redis_url, redis_cfg.pool_max
124 );
125
126 let storages = create_redis_storage(redis_cfg)?;
127
128 info!("Data connector initialized successfully: Redis");
129 storages
130 }
131 };
132
133 if let Some(hook) = config.hook {
135 info!("Wrapping storage backends with hook");
136 Ok(StorageBundle {
137 response_storage: Arc::new(HookedResponseStorage::new(
138 bundle.response_storage,
139 hook.clone(),
140 )),
141 conversation_storage: Arc::new(HookedConversationStorage::new(
142 bundle.conversation_storage,
143 hook.clone(),
144 )),
145 conversation_item_storage: Arc::new(HookedConversationItemStorage::new(
146 bundle.conversation_item_storage,
147 hook,
148 )),
149 })
150 } else {
151 Ok(bundle)
152 }
153}
154
155fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageBundle, String> {
157 use crate::oracle::OracleStore;
158
159 let store = OracleStore::new(
160 oracle_cfg,
161 &[
162 OracleConversationStorage::init_schema,
163 OracleConversationItemStorage::init_schema,
164 OracleResponseStorage::init_schema,
165 ],
166 )?;
167
168 Ok(StorageBundle {
169 response_storage: Arc::new(OracleResponseStorage::new(store.clone())),
170 conversation_storage: Arc::new(OracleConversationStorage::new(store.clone())),
171 conversation_item_storage: Arc::new(OracleConversationItemStorage::new(store)),
172 })
173}
174
175async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageBundle, String> {
176 let store = PostgresStore::new(postgres_cfg.clone())?;
177 let postgres_resp = PostgresResponseStorage::new(store.clone())
178 .await
179 .map_err(|err| format!("failed to initialize Postgres response storage: {err}"))?;
180 let postgres_conv = PostgresConversationStorage::new(store.clone())
181 .await
182 .map_err(|err| format!("failed to initialize Postgres conversation storage: {err}"))?;
183 let postgres_item = PostgresConversationItemStorage::new(store.clone())
184 .await
185 .map_err(|err| format!("failed to initialize Postgres conversation item storage: {err}"))?;
186
187 let applied = store.run_migrations().await?;
189
190 if !applied.is_empty() {
193 store.ensure_response_indexes().await?;
194 }
195
196 Ok(StorageBundle {
197 response_storage: Arc::new(postgres_resp),
198 conversation_storage: Arc::new(postgres_conv),
199 conversation_item_storage: Arc::new(postgres_item),
200 })
201}
202
203fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageBundle, String> {
204 let store = RedisStore::new(redis_cfg.clone())?;
205 let redis_resp = RedisResponseStorage::new(store.clone());
206 let redis_conv = RedisConversationStorage::new(store.clone());
207 let redis_item = RedisConversationItemStorage::new(store);
208
209 Ok(StorageBundle {
210 response_storage: Arc::new(redis_resp),
211 conversation_storage: Arc::new(redis_conv),
212 conversation_item_storage: Arc::new(redis_item),
213 })
214}
215
216#[cfg(test)]
217mod tests {
218 use serde_json::json;
219
220 use super::*;
221 use crate::core::{NewConversation, NewConversationItem, StoredResponse};
222
223 #[tokio::test]
224 async fn test_create_storage_memory() {
225 let config = StorageFactoryConfig {
226 backend: &HistoryBackend::Memory,
227 oracle: None,
228 postgres: None,
229 redis: None,
230 hook: None,
231 };
232 let bundle = create_storage(config).await.unwrap();
233 let (resp, conv, items) = (
234 bundle.response_storage,
235 bundle.conversation_storage,
236 bundle.conversation_item_storage,
237 );
238
239 let mut response = StoredResponse::new(None);
241 response.input = json!("hello");
242 let id = resp.store_response(response).await.unwrap();
243 assert!(resp.get_response(&id).await.unwrap().is_some());
244
245 let conversation = conv
246 .create_conversation(NewConversation {
247 id: None,
248 metadata: None,
249 })
250 .await
251 .unwrap();
252 assert!(conv
253 .get_conversation(&conversation.id)
254 .await
255 .unwrap()
256 .is_some());
257
258 let item = items
259 .create_item(NewConversationItem {
260 id: None,
261 response_id: None,
262 item_type: "message".to_string(),
263 role: Some("user".to_string()),
264 content: json!([]),
265 status: Some("completed".to_string()),
266 })
267 .await
268 .unwrap();
269 assert!(items.get_item(&item.id).await.unwrap().is_some());
270 }
271
272 #[tokio::test]
273 async fn test_create_storage_none() {
274 let config = StorageFactoryConfig {
275 backend: &HistoryBackend::None,
276 oracle: None,
277 postgres: None,
278 redis: None,
279 hook: None,
280 };
281 let bundle = create_storage(config).await.unwrap();
282 let (resp, conv) = (bundle.response_storage, bundle.conversation_storage);
283
284 let mut response = StoredResponse::new(None);
286 response.input = json!("hello");
287 let id = resp.store_response(response).await.unwrap();
288 assert!(resp.get_response(&id).await.unwrap().is_none());
289 assert!(conv
290 .get_conversation(&"nonexistent".into())
291 .await
292 .unwrap()
293 .is_none());
294 }
295
296 #[tokio::test]
297 async fn test_create_storage_oracle_missing_config() {
298 let err = create_storage(StorageFactoryConfig {
299 backend: &HistoryBackend::Oracle,
300 oracle: None,
301 postgres: None,
302 redis: None,
303 hook: None,
304 })
305 .await
306 .err()
307 .expect("should fail");
308 assert!(err.contains("oracle configuration is required"));
309 }
310
311 #[tokio::test]
312 async fn test_create_storage_postgres_missing_config() {
313 let err = create_storage(StorageFactoryConfig {
314 backend: &HistoryBackend::Postgres,
315 oracle: None,
316 postgres: None,
317 redis: None,
318 hook: None,
319 })
320 .await
321 .err()
322 .expect("should fail");
323 assert!(err.contains("Postgres configuration is required"));
324 }
325
326 #[tokio::test]
327 async fn test_create_storage_redis_missing_config() {
328 let err = create_storage(StorageFactoryConfig {
329 backend: &HistoryBackend::Redis,
330 oracle: None,
331 postgres: None,
332 redis: None,
333 hook: None,
334 })
335 .await
336 .err()
337 .expect("should fail");
338 assert!(err.contains("Redis configuration is required"));
339 }
340
341 #[tokio::test]
342 async fn test_create_storage_with_hook() {
343 use std::sync::Arc;
344
345 use async_trait::async_trait;
346
347 use crate::{
348 context::RequestContext,
349 hooks::{BeforeHookResult, ExtraColumns, HookError, StorageHook, StorageOperation},
350 };
351
352 struct NoOpHook;
353
354 #[async_trait]
355 impl StorageHook for NoOpHook {
356 async fn before(
357 &self,
358 _op: StorageOperation,
359 _ctx: Option<&RequestContext>,
360 _payload: &serde_json::Value,
361 ) -> Result<BeforeHookResult, HookError> {
362 Ok(BeforeHookResult::default())
363 }
364
365 async fn after(
366 &self,
367 _op: StorageOperation,
368 _ctx: Option<&RequestContext>,
369 _payload: &serde_json::Value,
370 _result: &serde_json::Value,
371 extra: &ExtraColumns,
372 ) -> Result<ExtraColumns, HookError> {
373 Ok(extra.clone())
374 }
375 }
376
377 let config = StorageFactoryConfig {
378 backend: &HistoryBackend::Memory,
379 oracle: None,
380 postgres: None,
381 redis: None,
382 hook: Some(Arc::new(NoOpHook)),
383 };
384 let bundle = create_storage(config).await.unwrap();
385 let (resp, conv, items) = (
386 bundle.response_storage,
387 bundle.conversation_storage,
388 bundle.conversation_item_storage,
389 );
390
391 let mut response = StoredResponse::new(None);
393 response.input = json!("hello");
394 let id = resp.store_response(response).await.unwrap();
395 assert!(resp.get_response(&id).await.unwrap().is_some());
396
397 let conversation = conv
398 .create_conversation(NewConversation {
399 id: None,
400 metadata: None,
401 })
402 .await
403 .unwrap();
404 assert!(conv
405 .get_conversation(&conversation.id)
406 .await
407 .unwrap()
408 .is_some());
409
410 let item = items
411 .create_item(NewConversationItem {
412 id: None,
413 response_id: None,
414 item_type: "message".to_string(),
415 role: Some("user".to_string()),
416 content: json!([]),
417 status: Some("completed".to_string()),
418 })
419 .await
420 .unwrap();
421 assert!(items.get_item(&item.id).await.unwrap().is_some());
422 }
423}