1use std::sync::Arc;
4
5use tracing::info;
6use url::Url;
7
8use crate::{
9 config::{HistoryBackend, OracleConfig, PostgresConfig, RedisConfig},
10 core::{ConversationItemStorage, ConversationStorage, ResponseStorage},
11 hooked::{HookedConversationItemStorage, HookedConversationStorage, HookedResponseStorage},
12 hooks::StorageHook,
13 memory::{MemoryConversationItemStorage, MemoryConversationStorage, MemoryResponseStorage},
14 noop::{NoOpConversationItemStorage, NoOpConversationStorage, NoOpResponseStorage},
15 oracle::{OracleConversationItemStorage, OracleConversationStorage, OracleResponseStorage},
16 postgres::{
17 PostgresConversationItemStorage, PostgresConversationStorage, PostgresResponseStorage,
18 PostgresStore,
19 },
20 redis::{
21 RedisConversationItemStorage, RedisConversationStorage, RedisResponseStorage, RedisStore,
22 },
23};
24
25pub type StorageTuple = (
28 Arc<dyn ResponseStorage>,
29 Arc<dyn ConversationStorage>,
30 Arc<dyn ConversationItemStorage>,
31);
32
33pub struct StorageFactoryConfig<'a> {
35 pub backend: &'a HistoryBackend,
36 pub oracle: Option<&'a OracleConfig>,
37 pub postgres: Option<&'a PostgresConfig>,
38 pub redis: Option<&'a RedisConfig>,
39 pub hook: Option<Arc<dyn StorageHook>>,
43}
44
45pub async fn create_storage(config: StorageFactoryConfig<'_>) -> Result<StorageTuple, String> {
56 let (resp, conv, items): StorageTuple = match config.backend {
57 HistoryBackend::Memory => {
58 info!("Initializing data connector: Memory");
59 (
60 Arc::new(MemoryResponseStorage::new()),
61 Arc::new(MemoryConversationStorage::new()),
62 Arc::new(MemoryConversationItemStorage::new()),
63 )
64 }
65 HistoryBackend::None => {
66 info!("Initializing data connector: None (no persistence)");
67 (
68 Arc::new(NoOpResponseStorage::new()),
69 Arc::new(NoOpConversationStorage::new()),
70 Arc::new(NoOpConversationItemStorage::new()),
71 )
72 }
73 HistoryBackend::Oracle => {
74 let oracle_cfg = config
75 .oracle
76 .ok_or("oracle configuration is required when history_backend=oracle")?;
77
78 info!(
79 "Initializing data connector: Oracle ATP (pool: {}-{})",
80 oracle_cfg.pool_min, oracle_cfg.pool_max
81 );
82
83 let storages = create_oracle_storage(oracle_cfg)?;
84
85 info!("Data connector initialized successfully: Oracle ATP");
86 storages
87 }
88 HistoryBackend::Postgres => {
89 let postgres_cfg = config
90 .postgres
91 .ok_or("Postgres configuration is required when history_backend=postgres")?;
92
93 let log_db_url = match Url::parse(&postgres_cfg.db_url) {
94 Ok(mut url) => {
95 if url.password().is_some() {
96 let _ = url.set_password(Some("****"));
97 }
98 url.to_string()
99 }
100 Err(_) => "<redacted>".to_string(),
101 };
102
103 info!(
104 "Initializing data connector: Postgres (db_url: {}, pool_max: {})",
105 log_db_url, postgres_cfg.pool_max
106 );
107
108 let storages = create_postgres_storage(postgres_cfg).await?;
109
110 info!("Data connector initialized successfully: Postgres");
111 storages
112 }
113 HistoryBackend::Redis => {
114 let redis_cfg = config
115 .redis
116 .ok_or("Redis configuration is required when history_backend=redis")?;
117
118 let log_redis_url = match Url::parse(&redis_cfg.url) {
119 Ok(mut url) => {
120 if url.password().is_some() {
121 let _ = url.set_password(Some("****"));
122 }
123 url.to_string()
124 }
125 Err(_) => "<redacted>".to_string(),
126 };
127
128 info!(
129 "Initializing data connector: Redis (url: {}, pool_max: {})",
130 log_redis_url, redis_cfg.pool_max
131 );
132
133 let storages = create_redis_storage(redis_cfg)?;
134
135 info!("Data connector initialized successfully: Redis");
136 storages
137 }
138 };
139
140 if let Some(hook) = config.hook {
142 info!("Wrapping storage backends with hook");
143 Ok((
144 Arc::new(HookedResponseStorage::new(resp, hook.clone())),
145 Arc::new(HookedConversationStorage::new(conv, hook.clone())),
146 Arc::new(HookedConversationItemStorage::new(items, hook)),
147 ))
148 } else {
149 Ok((resp, conv, items))
150 }
151}
152
153fn create_oracle_storage(oracle_cfg: &OracleConfig) -> Result<StorageTuple, String> {
155 use crate::oracle::OracleStore;
156
157 let store = OracleStore::new(
158 oracle_cfg,
159 &[
160 OracleConversationStorage::init_schema,
161 OracleConversationItemStorage::init_schema,
162 OracleResponseStorage::init_schema,
163 ],
164 )?;
165
166 Ok((
167 Arc::new(OracleResponseStorage::new(store.clone())),
168 Arc::new(OracleConversationStorage::new(store.clone())),
169 Arc::new(OracleConversationItemStorage::new(store)),
170 ))
171}
172
173async fn create_postgres_storage(postgres_cfg: &PostgresConfig) -> Result<StorageTuple, String> {
174 let store = PostgresStore::new(postgres_cfg.clone())?;
175 let postgres_resp = PostgresResponseStorage::new(store.clone())
176 .await
177 .map_err(|err| format!("failed to initialize Postgres response storage: {err}"))?;
178 let postgres_conv = PostgresConversationStorage::new(store.clone())
179 .await
180 .map_err(|err| format!("failed to initialize Postgres conversation storage: {err}"))?;
181 let postgres_item = PostgresConversationItemStorage::new(store.clone())
182 .await
183 .map_err(|err| format!("failed to initialize Postgres conversation item storage: {err}"))?;
184
185 let applied = store.run_migrations().await?;
187
188 if !applied.is_empty() {
191 store.ensure_response_indexes().await?;
192 }
193
194 Ok((
195 Arc::new(postgres_resp),
196 Arc::new(postgres_conv),
197 Arc::new(postgres_item),
198 ))
199}
200
201fn create_redis_storage(redis_cfg: &RedisConfig) -> Result<StorageTuple, String> {
202 let store = RedisStore::new(redis_cfg.clone())?;
203 let redis_resp = RedisResponseStorage::new(store.clone());
204 let redis_conv = RedisConversationStorage::new(store.clone());
205 let redis_item = RedisConversationItemStorage::new(store);
206
207 Ok((
208 Arc::new(redis_resp),
209 Arc::new(redis_conv),
210 Arc::new(redis_item),
211 ))
212}
213
214#[cfg(test)]
215mod tests {
216 use serde_json::json;
217
218 use super::*;
219 use crate::core::{NewConversation, NewConversationItem, StoredResponse};
220
221 #[tokio::test]
222 async fn test_create_storage_memory() {
223 let config = StorageFactoryConfig {
224 backend: &HistoryBackend::Memory,
225 oracle: None,
226 postgres: None,
227 redis: None,
228 hook: None,
229 };
230 let (resp, conv, items) = create_storage(config).await.unwrap();
231
232 let mut response = StoredResponse::new(None);
234 response.input = json!("hello");
235 let id = resp.store_response(response).await.unwrap();
236 assert!(resp.get_response(&id).await.unwrap().is_some());
237
238 let conversation = conv
239 .create_conversation(NewConversation {
240 id: None,
241 metadata: None,
242 })
243 .await
244 .unwrap();
245 assert!(conv
246 .get_conversation(&conversation.id)
247 .await
248 .unwrap()
249 .is_some());
250
251 let item = items
252 .create_item(NewConversationItem {
253 id: None,
254 response_id: None,
255 item_type: "message".to_string(),
256 role: Some("user".to_string()),
257 content: json!([]),
258 status: Some("completed".to_string()),
259 })
260 .await
261 .unwrap();
262 assert!(items.get_item(&item.id).await.unwrap().is_some());
263 }
264
265 #[tokio::test]
266 async fn test_create_storage_none() {
267 let config = StorageFactoryConfig {
268 backend: &HistoryBackend::None,
269 oracle: None,
270 postgres: None,
271 redis: None,
272 hook: None,
273 };
274 let (resp, conv, _items) = create_storage(config).await.unwrap();
275
276 let mut response = StoredResponse::new(None);
278 response.input = json!("hello");
279 let id = resp.store_response(response).await.unwrap();
280 assert!(resp.get_response(&id).await.unwrap().is_none());
281 assert!(conv
282 .get_conversation(&"nonexistent".into())
283 .await
284 .unwrap()
285 .is_none());
286 }
287
288 #[tokio::test]
289 async fn test_create_storage_oracle_missing_config() {
290 let config = StorageFactoryConfig {
291 backend: &HistoryBackend::Oracle,
292 oracle: None,
293 postgres: None,
294 redis: None,
295 hook: None,
296 };
297 let err = create_storage(config).await.err().expect("should fail");
298 assert!(err.contains("oracle configuration is required"));
299 }
300
301 #[tokio::test]
302 async fn test_create_storage_postgres_missing_config() {
303 let config = StorageFactoryConfig {
304 backend: &HistoryBackend::Postgres,
305 oracle: None,
306 postgres: None,
307 redis: None,
308 hook: None,
309 };
310 let err = create_storage(config).await.err().expect("should fail");
311 assert!(err.contains("Postgres configuration is required"));
312 }
313
314 #[tokio::test]
315 async fn test_create_storage_redis_missing_config() {
316 let config = StorageFactoryConfig {
317 backend: &HistoryBackend::Redis,
318 oracle: None,
319 postgres: None,
320 redis: None,
321 hook: None,
322 };
323 let err = create_storage(config).await.err().expect("should fail");
324 assert!(err.contains("Redis configuration is required"));
325 }
326
327 #[tokio::test]
328 async fn test_create_storage_with_hook() {
329 use std::sync::Arc;
330
331 use async_trait::async_trait;
332
333 use crate::{
334 context::RequestContext,
335 hooks::{BeforeHookResult, ExtraColumns, HookError, StorageHook, StorageOperation},
336 };
337
338 struct NoOpHook;
339
340 #[async_trait]
341 impl StorageHook for NoOpHook {
342 async fn before(
343 &self,
344 _op: StorageOperation,
345 _ctx: Option<&RequestContext>,
346 _payload: &serde_json::Value,
347 ) -> Result<BeforeHookResult, HookError> {
348 Ok(BeforeHookResult::default())
349 }
350
351 async fn after(
352 &self,
353 _op: StorageOperation,
354 _ctx: Option<&RequestContext>,
355 _payload: &serde_json::Value,
356 _result: &serde_json::Value,
357 extra: &ExtraColumns,
358 ) -> Result<ExtraColumns, HookError> {
359 Ok(extra.clone())
360 }
361 }
362
363 let config = StorageFactoryConfig {
364 backend: &HistoryBackend::Memory,
365 oracle: None,
366 postgres: None,
367 redis: None,
368 hook: Some(Arc::new(NoOpHook)),
369 };
370 let (resp, conv, items) = create_storage(config).await.unwrap();
371
372 let mut response = StoredResponse::new(None);
374 response.input = json!("hello");
375 let id = resp.store_response(response).await.unwrap();
376 assert!(resp.get_response(&id).await.unwrap().is_some());
377
378 let conversation = conv
379 .create_conversation(NewConversation {
380 id: None,
381 metadata: None,
382 })
383 .await
384 .unwrap();
385 assert!(conv
386 .get_conversation(&conversation.id)
387 .await
388 .unwrap()
389 .is_some());
390
391 let item = items
392 .create_item(NewConversationItem {
393 id: None,
394 response_id: None,
395 item_type: "message".to_string(),
396 role: Some("user".to_string()),
397 content: json!([]),
398 status: Some("completed".to_string()),
399 })
400 .await
401 .unwrap();
402 assert!(items.get_item(&item.id).await.unwrap().is_some());
403 }
404}