1use std::future::Future;
30
31use anyhow::{anyhow, Result};
32use chrono::{DateTime, Utc};
33use deadpool_postgres::Pool;
34use tokio::runtime::Handle;
35
36use smooth_operator::connector_config::{ConnectorConfig, ConnectorConfigStore, ConnectorKind};
37use smooth_operator::settings::{AgentSettings, SettingsStore};
38use smooth_operator_ingestion::indexing::{IndexingRun, IndexingRunStatus, IndexingStore};
39use smooth_operator_ingestion::Timestamp;
40
41fn run_blocking<F, T>(handle: &Handle, fut: F) -> Result<T>
50where
51 F: Future<Output = Result<T>> + Send + 'static,
52 T: Send + 'static,
53{
54 let join = handle.spawn(fut);
55 let (tx, rx) = std::sync::mpsc::channel();
56 std::thread::spawn(move || {
57 let result = (|| -> Result<T> {
58 let rt = tokio::runtime::Builder::new_current_thread()
59 .enable_all()
60 .build()?;
61 let joined = rt.block_on(join);
62 joined.map_err(|e| anyhow!("admin store task panicked or was cancelled: {e}"))?
63 })();
64 let _ = tx.send(result);
65 });
66 rx.recv()
67 .map_err(|e| anyhow!("admin store task channel closed: {e}"))?
68}
69
70#[derive(Clone)]
76pub struct PgConnectorConfigStore {
77 pool: Pool,
78 handle: Handle,
79}
80
81impl PgConnectorConfigStore {
82 #[must_use]
84 pub fn new(pool: Pool, handle: Handle) -> Self {
85 Self { pool, handle }
86 }
87
88 async fn list_async(&self, org_id: String) -> Result<Vec<ConnectorConfig>> {
89 let client = self.pool.get().await?;
90 let rows = client
91 .query(
92 "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
93 FROM connector_configs
94 WHERE org_id = $1
95 ORDER BY name, id",
96 &[&org_id],
97 )
98 .await?;
99 rows.iter().map(row_to_connector).collect()
100 }
101
102 async fn get_async(&self, org_id: String, id: String) -> Result<Option<ConnectorConfig>> {
103 let client = self.pool.get().await?;
104 let row = client
105 .query_opt(
106 "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
107 FROM connector_configs
108 WHERE org_id = $1 AND id = $2",
109 &[&org_id, &id],
110 )
111 .await?;
112 row.as_ref().map(row_to_connector).transpose()
113 }
114
115 async fn upsert_async(&self, cfg: ConnectorConfig) -> Result<()> {
116 let client = self.pool.get().await?;
117 client
118 .execute(
119 "INSERT INTO connector_configs
120 (org_id, id, name, kind, config, enabled, created_at, updated_at)
121 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
122 ON CONFLICT (org_id, id) DO UPDATE SET
123 name = EXCLUDED.name,
124 kind = EXCLUDED.kind,
125 config = EXCLUDED.config,
126 enabled = EXCLUDED.enabled,
127 created_at = EXCLUDED.created_at,
128 updated_at = EXCLUDED.updated_at",
129 &[
130 &cfg.org_id,
131 &cfg.id,
132 &cfg.name,
133 &cfg.kind.as_str(),
134 &cfg.config,
135 &cfg.enabled,
136 &cfg.created_at,
137 &cfg.updated_at,
138 ],
139 )
140 .await?;
141 Ok(())
142 }
143
144 async fn delete_async(&self, org_id: String, id: String) -> Result<bool> {
145 let client = self.pool.get().await?;
146 let n = client
147 .execute(
148 "DELETE FROM connector_configs WHERE org_id = $1 AND id = $2",
149 &[&org_id, &id],
150 )
151 .await?;
152 Ok(n > 0)
153 }
154}
155
156fn row_to_connector(row: &tokio_postgres::Row) -> Result<ConnectorConfig> {
157 let kind_str: String = row.get("kind");
158 let kind = ConnectorKind::parse(&kind_str)
159 .map_err(|bad| anyhow!("unknown connector kind '{bad}' in connector_configs row"))?;
160 Ok(ConnectorConfig {
161 id: row.get("id"),
162 org_id: row.get("org_id"),
163 name: row.get("name"),
164 kind,
165 config: row.get("config"),
166 enabled: row.get("enabled"),
167 created_at: row.get("created_at"),
168 updated_at: row.get("updated_at"),
169 })
170}
171
172impl ConnectorConfigStore for PgConnectorConfigStore {
173 fn list(&self, org_id: &str) -> Vec<ConnectorConfig> {
174 let this = self.clone();
175 let org_id = org_id.to_string();
176 run_blocking(&self.handle, async move { this.list_async(org_id).await }).unwrap_or_default()
177 }
178
179 fn get(&self, org_id: &str, id: &str) -> Option<ConnectorConfig> {
180 let this = self.clone();
181 let org_id = org_id.to_string();
182 let id = id.to_string();
183 run_blocking(
184 &self.handle,
185 async move { this.get_async(org_id, id).await },
186 )
187 .ok()
188 .flatten()
189 }
190
191 fn upsert(&self, config: ConnectorConfig) {
192 let this = self.clone();
193 let _ = run_blocking(&self.handle, async move { this.upsert_async(config).await });
194 }
195
196 fn delete(&self, org_id: &str, id: &str) -> bool {
197 let this = self.clone();
198 let org_id = org_id.to_string();
199 let id = id.to_string();
200 run_blocking(
201 &self.handle,
202 async move { this.delete_async(org_id, id).await },
203 )
204 .unwrap_or(false)
205 }
206}
207
208#[derive(Clone)]
214pub struct PgSettingsStore {
215 pool: Pool,
216 handle: Handle,
217}
218
219impl PgSettingsStore {
220 #[must_use]
222 pub fn new(pool: Pool, handle: Handle) -> Self {
223 Self { pool, handle }
224 }
225
226 async fn get_async(&self, org_id: String) -> Result<Option<AgentSettings>> {
227 let client = self.pool.get().await?;
228 let row = client
229 .query_opt(
230 "SELECT org_id, model, system_prompt, default_tools, updated_at
231 FROM agent_settings WHERE org_id = $1",
232 &[&org_id],
233 )
234 .await?;
235 match row {
236 Some(row) => {
237 let default_tools: serde_json::Value = row.get("default_tools");
238 let default_tools: Vec<String> = serde_json::from_value(default_tools)?;
239 Ok(Some(AgentSettings {
240 org_id: row.get("org_id"),
241 model: row.get("model"),
242 system_prompt: row.get("system_prompt"),
243 persona: None,
248 default_tools,
249 updated_at: row.get("updated_at"),
250 }))
251 }
252 None => Ok(None),
253 }
254 }
255
256 async fn put_async(&self, settings: AgentSettings) -> Result<()> {
257 let client = self.pool.get().await?;
258 let default_tools = serde_json::to_value(&settings.default_tools)?;
259 client
260 .execute(
261 "INSERT INTO agent_settings
262 (org_id, model, system_prompt, default_tools, updated_at)
263 VALUES ($1,$2,$3,$4,$5)
264 ON CONFLICT (org_id) DO UPDATE SET
265 model = EXCLUDED.model,
266 system_prompt = EXCLUDED.system_prompt,
267 default_tools = EXCLUDED.default_tools,
268 updated_at = EXCLUDED.updated_at",
269 &[
270 &settings.org_id,
271 &settings.model,
272 &settings.system_prompt,
273 &default_tools,
274 &settings.updated_at,
275 ],
276 )
277 .await?;
278 Ok(())
279 }
280}
281
282impl SettingsStore for PgSettingsStore {
283 fn get(&self, org_id: &str) -> AgentSettings {
284 let this = self.clone();
285 let org = org_id.to_string();
286 run_blocking(&self.handle, async move { this.get_async(org).await })
289 .ok()
290 .flatten()
291 .unwrap_or_else(|| AgentSettings::defaults(org_id))
292 }
293
294 fn put(&self, settings: AgentSettings) {
295 let this = self.clone();
296 let _ = run_blocking(&self.handle, async move { this.put_async(settings).await });
297 }
298}
299
300#[derive(Clone)]
306pub struct PgIndexingStore {
307 pool: Pool,
308 handle: Handle,
309}
310
311impl PgIndexingStore {
312 #[must_use]
314 pub fn new(pool: Pool, handle: Handle) -> Self {
315 Self { pool, handle }
316 }
317
318 async fn record_run_async(&self, run: IndexingRun) -> Result<()> {
319 let client = self.pool.get().await?;
320 let status = status_to_str(run.status);
321 let documents_seen = i64::try_from(run.documents_seen).unwrap_or(i64::MAX);
322 let chunks_indexed = i64::try_from(run.chunks_indexed).unwrap_or(i64::MAX);
323 let documents_skipped = i64::try_from(run.documents_skipped).unwrap_or(i64::MAX);
324 client
325 .execute(
326 "INSERT INTO indexing_runs
327 (id, connector_name, status, started_at, finished_at,
328 documents_seen, chunks_indexed, documents_skipped, cursor, error)
329 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
330 ON CONFLICT (id) DO UPDATE SET
331 connector_name = EXCLUDED.connector_name,
332 status = EXCLUDED.status,
333 started_at = EXCLUDED.started_at,
334 finished_at = EXCLUDED.finished_at,
335 documents_seen = EXCLUDED.documents_seen,
336 chunks_indexed = EXCLUDED.chunks_indexed,
337 documents_skipped = EXCLUDED.documents_skipped,
338 cursor = EXCLUDED.cursor,
339 error = EXCLUDED.error",
340 &[
341 &run.id,
342 &run.connector_name,
343 &status,
344 &run.started_at,
345 &run.finished_at,
346 &documents_seen,
347 &chunks_indexed,
348 &documents_skipped,
349 &run.cursor,
350 &run.error,
351 ],
352 )
353 .await?;
354 Ok(())
355 }
356
357 async fn latest_cursor_async(&self, connector_name: String) -> Result<Option<Timestamp>> {
358 let client = self.pool.get().await?;
359 let row = client
361 .query_one(
362 "SELECT max(cursor) AS c
363 FROM indexing_runs
364 WHERE connector_name = $1 AND status = 'succeeded'",
365 &[&connector_name],
366 )
367 .await?;
368 Ok(row.get::<_, Option<DateTime<Utc>>>("c"))
369 }
370
371 async fn list_runs_async(&self, connector_name: String) -> Result<Vec<IndexingRun>> {
372 let client = self.pool.get().await?;
373 let rows = client
375 .query(
376 "SELECT id, connector_name, status, started_at, finished_at,
377 documents_seen, chunks_indexed, documents_skipped, cursor, error
378 FROM indexing_runs
379 WHERE connector_name = $1
380 ORDER BY started_at ASC, id ASC",
381 &[&connector_name],
382 )
383 .await?;
384 rows.iter().map(row_to_run).collect()
385 }
386}
387
388fn status_to_str(status: IndexingRunStatus) -> &'static str {
389 match status {
390 IndexingRunStatus::Running => "running",
391 IndexingRunStatus::Succeeded => "succeeded",
392 IndexingRunStatus::Failed => "failed",
393 }
394}
395
396fn status_from_str(s: &str) -> Result<IndexingRunStatus> {
397 Ok(match s {
398 "running" => IndexingRunStatus::Running,
399 "succeeded" => IndexingRunStatus::Succeeded,
400 "failed" => IndexingRunStatus::Failed,
401 other => return Err(anyhow!("unknown indexing run status '{other}'")),
402 })
403}
404
405fn row_to_run(row: &tokio_postgres::Row) -> Result<IndexingRun> {
406 let status = status_from_str(row.get::<_, String>("status").as_str())?;
407 let documents_seen: i64 = row.get("documents_seen");
408 let chunks_indexed: i64 = row.get("chunks_indexed");
409 let documents_skipped: i64 = row.get("documents_skipped");
410 Ok(IndexingRun {
411 id: row.get("id"),
412 connector_name: row.get("connector_name"),
413 status,
414 started_at: row.get("started_at"),
415 finished_at: row.get("finished_at"),
416 documents_seen: usize::try_from(documents_seen).unwrap_or(0),
417 chunks_indexed: usize::try_from(chunks_indexed).unwrap_or(0),
418 documents_skipped: usize::try_from(documents_skipped).unwrap_or(0),
419 cursor: row.get("cursor"),
420 error: row.get("error"),
421 })
422}
423
424impl IndexingStore for PgIndexingStore {
425 fn record_run(&self, run: &IndexingRun) {
426 let this = self.clone();
427 let run = run.clone();
428 let _ = run_blocking(
429 &self.handle,
430 async move { this.record_run_async(run).await },
431 );
432 }
433
434 fn latest_cursor(&self, connector_name: &str) -> Option<Timestamp> {
435 let this = self.clone();
436 let name = connector_name.to_string();
437 run_blocking(
438 &self.handle,
439 async move { this.latest_cursor_async(name).await },
440 )
441 .ok()
442 .flatten()
443 }
444
445 fn list_runs(&self, connector_name: &str) -> Vec<IndexingRun> {
446 let this = self.clone();
447 let name = connector_name.to_string();
448 run_blocking(
449 &self.handle,
450 async move { this.list_runs_async(name).await },
451 )
452 .unwrap_or_default()
453 }
454}