1use std::future::Future;
30
31use anyhow::{anyhow, Result};
32use chrono::{DateTime, Utc};
33use deadpool_postgres::Pool;
34use tokio::runtime::Handle;
35
36use smooth_operator::connector_config::{ConnectorConfig, ConnectorConfigStore, ConnectorKind};
37use smooth_operator::settings::{AgentSettings, SettingsStore};
38use smooth_operator_ingestion::indexing::{IndexingRun, IndexingRunStatus, IndexingStore};
39use smooth_operator_ingestion::Timestamp;
40
41fn run_blocking<F, T>(handle: &Handle, fut: F) -> Result<T>
50where
51 F: Future<Output = Result<T>> + Send + 'static,
52 T: Send + 'static,
53{
54 let join = handle.spawn(fut);
55 let (tx, rx) = std::sync::mpsc::channel();
56 std::thread::spawn(move || {
57 let result = (|| -> Result<T> {
58 let rt = tokio::runtime::Builder::new_current_thread()
59 .enable_all()
60 .build()?;
61 let joined = rt.block_on(join);
62 joined.map_err(|e| anyhow!("admin store task panicked or was cancelled: {e}"))?
63 })();
64 let _ = tx.send(result);
65 });
66 rx.recv()
67 .map_err(|e| anyhow!("admin store task channel closed: {e}"))?
68}
69
70#[derive(Clone)]
76pub struct PgConnectorConfigStore {
77 pool: Pool,
78 handle: Handle,
79}
80
81impl PgConnectorConfigStore {
82 #[must_use]
84 pub fn new(pool: Pool, handle: Handle) -> Self {
85 Self { pool, handle }
86 }
87
88 async fn list_async(&self, org_id: String) -> Result<Vec<ConnectorConfig>> {
89 let client = self.pool.get().await?;
90 let rows = client
91 .query(
92 "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
93 FROM connector_configs
94 WHERE org_id = $1
95 ORDER BY name, id",
96 &[&org_id],
97 )
98 .await?;
99 rows.iter().map(row_to_connector).collect()
100 }
101
102 async fn get_async(&self, org_id: String, id: String) -> Result<Option<ConnectorConfig>> {
103 let client = self.pool.get().await?;
104 let row = client
105 .query_opt(
106 "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
107 FROM connector_configs
108 WHERE org_id = $1 AND id = $2",
109 &[&org_id, &id],
110 )
111 .await?;
112 row.as_ref().map(row_to_connector).transpose()
113 }
114
115 async fn upsert_async(&self, cfg: ConnectorConfig) -> Result<()> {
116 let client = self.pool.get().await?;
117 client
118 .execute(
119 "INSERT INTO connector_configs
120 (org_id, id, name, kind, config, enabled, created_at, updated_at)
121 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
122 ON CONFLICT (org_id, id) DO UPDATE SET
123 name = EXCLUDED.name,
124 kind = EXCLUDED.kind,
125 config = EXCLUDED.config,
126 enabled = EXCLUDED.enabled,
127 created_at = EXCLUDED.created_at,
128 updated_at = EXCLUDED.updated_at",
129 &[
130 &cfg.org_id,
131 &cfg.id,
132 &cfg.name,
133 &cfg.kind.as_str(),
134 &cfg.config,
135 &cfg.enabled,
136 &cfg.created_at,
137 &cfg.updated_at,
138 ],
139 )
140 .await?;
141 Ok(())
142 }
143
144 async fn delete_async(&self, org_id: String, id: String) -> Result<bool> {
145 let client = self.pool.get().await?;
146 let n = client
147 .execute(
148 "DELETE FROM connector_configs WHERE org_id = $1 AND id = $2",
149 &[&org_id, &id],
150 )
151 .await?;
152 Ok(n > 0)
153 }
154}
155
156fn row_to_connector(row: &tokio_postgres::Row) -> Result<ConnectorConfig> {
157 let kind_str: String = row.get("kind");
158 let kind = ConnectorKind::parse(&kind_str)
159 .map_err(|bad| anyhow!("unknown connector kind '{bad}' in connector_configs row"))?;
160 Ok(ConnectorConfig {
161 id: row.get("id"),
162 org_id: row.get("org_id"),
163 name: row.get("name"),
164 kind,
165 config: row.get("config"),
166 enabled: row.get("enabled"),
167 created_at: row.get("created_at"),
168 updated_at: row.get("updated_at"),
169 })
170}
171
172impl ConnectorConfigStore for PgConnectorConfigStore {
173 fn list(&self, org_id: &str) -> Vec<ConnectorConfig> {
174 let this = self.clone();
175 let org_id = org_id.to_string();
176 run_blocking(&self.handle, async move { this.list_async(org_id).await }).unwrap_or_default()
177 }
178
179 fn get(&self, org_id: &str, id: &str) -> Option<ConnectorConfig> {
180 let this = self.clone();
181 let org_id = org_id.to_string();
182 let id = id.to_string();
183 run_blocking(
184 &self.handle,
185 async move { this.get_async(org_id, id).await },
186 )
187 .ok()
188 .flatten()
189 }
190
191 fn upsert(&self, config: ConnectorConfig) {
192 let this = self.clone();
193 let _ = run_blocking(&self.handle, async move { this.upsert_async(config).await });
194 }
195
196 fn delete(&self, org_id: &str, id: &str) -> bool {
197 let this = self.clone();
198 let org_id = org_id.to_string();
199 let id = id.to_string();
200 run_blocking(
201 &self.handle,
202 async move { this.delete_async(org_id, id).await },
203 )
204 .unwrap_or(false)
205 }
206}
207
208#[derive(Clone)]
214pub struct PgSettingsStore {
215 pool: Pool,
216 handle: Handle,
217}
218
219impl PgSettingsStore {
220 #[must_use]
222 pub fn new(pool: Pool, handle: Handle) -> Self {
223 Self { pool, handle }
224 }
225
226 async fn get_async(&self, org_id: String) -> Result<Option<AgentSettings>> {
227 let client = self.pool.get().await?;
228 let row = client
229 .query_opt(
230 "SELECT org_id, model, system_prompt, default_tools, updated_at
231 FROM agent_settings WHERE org_id = $1",
232 &[&org_id],
233 )
234 .await?;
235 match row {
236 Some(row) => {
237 let default_tools: serde_json::Value = row.get("default_tools");
238 let default_tools: Vec<String> = serde_json::from_value(default_tools)?;
239 Ok(Some(AgentSettings {
240 org_id: row.get("org_id"),
241 model: row.get("model"),
242 system_prompt: row.get("system_prompt"),
243 default_tools,
244 updated_at: row.get("updated_at"),
245 }))
246 }
247 None => Ok(None),
248 }
249 }
250
251 async fn put_async(&self, settings: AgentSettings) -> Result<()> {
252 let client = self.pool.get().await?;
253 let default_tools = serde_json::to_value(&settings.default_tools)?;
254 client
255 .execute(
256 "INSERT INTO agent_settings
257 (org_id, model, system_prompt, default_tools, updated_at)
258 VALUES ($1,$2,$3,$4,$5)
259 ON CONFLICT (org_id) DO UPDATE SET
260 model = EXCLUDED.model,
261 system_prompt = EXCLUDED.system_prompt,
262 default_tools = EXCLUDED.default_tools,
263 updated_at = EXCLUDED.updated_at",
264 &[
265 &settings.org_id,
266 &settings.model,
267 &settings.system_prompt,
268 &default_tools,
269 &settings.updated_at,
270 ],
271 )
272 .await?;
273 Ok(())
274 }
275}
276
277impl SettingsStore for PgSettingsStore {
278 fn get(&self, org_id: &str) -> AgentSettings {
279 let this = self.clone();
280 let org = org_id.to_string();
281 run_blocking(&self.handle, async move { this.get_async(org).await })
284 .ok()
285 .flatten()
286 .unwrap_or_else(|| AgentSettings::defaults(org_id))
287 }
288
289 fn put(&self, settings: AgentSettings) {
290 let this = self.clone();
291 let _ = run_blocking(&self.handle, async move { this.put_async(settings).await });
292 }
293}
294
295#[derive(Clone)]
301pub struct PgIndexingStore {
302 pool: Pool,
303 handle: Handle,
304}
305
306impl PgIndexingStore {
307 #[must_use]
309 pub fn new(pool: Pool, handle: Handle) -> Self {
310 Self { pool, handle }
311 }
312
313 async fn record_run_async(&self, run: IndexingRun) -> Result<()> {
314 let client = self.pool.get().await?;
315 let status = status_to_str(run.status);
316 let documents_seen = i64::try_from(run.documents_seen).unwrap_or(i64::MAX);
317 let chunks_indexed = i64::try_from(run.chunks_indexed).unwrap_or(i64::MAX);
318 let documents_skipped = i64::try_from(run.documents_skipped).unwrap_or(i64::MAX);
319 client
320 .execute(
321 "INSERT INTO indexing_runs
322 (id, connector_name, status, started_at, finished_at,
323 documents_seen, chunks_indexed, documents_skipped, cursor, error)
324 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
325 ON CONFLICT (id) DO UPDATE SET
326 connector_name = EXCLUDED.connector_name,
327 status = EXCLUDED.status,
328 started_at = EXCLUDED.started_at,
329 finished_at = EXCLUDED.finished_at,
330 documents_seen = EXCLUDED.documents_seen,
331 chunks_indexed = EXCLUDED.chunks_indexed,
332 documents_skipped = EXCLUDED.documents_skipped,
333 cursor = EXCLUDED.cursor,
334 error = EXCLUDED.error",
335 &[
336 &run.id,
337 &run.connector_name,
338 &status,
339 &run.started_at,
340 &run.finished_at,
341 &documents_seen,
342 &chunks_indexed,
343 &documents_skipped,
344 &run.cursor,
345 &run.error,
346 ],
347 )
348 .await?;
349 Ok(())
350 }
351
352 async fn latest_cursor_async(&self, connector_name: String) -> Result<Option<Timestamp>> {
353 let client = self.pool.get().await?;
354 let row = client
356 .query_one(
357 "SELECT max(cursor) AS c
358 FROM indexing_runs
359 WHERE connector_name = $1 AND status = 'succeeded'",
360 &[&connector_name],
361 )
362 .await?;
363 Ok(row.get::<_, Option<DateTime<Utc>>>("c"))
364 }
365
366 async fn list_runs_async(&self, connector_name: String) -> Result<Vec<IndexingRun>> {
367 let client = self.pool.get().await?;
368 let rows = client
370 .query(
371 "SELECT id, connector_name, status, started_at, finished_at,
372 documents_seen, chunks_indexed, documents_skipped, cursor, error
373 FROM indexing_runs
374 WHERE connector_name = $1
375 ORDER BY started_at ASC, id ASC",
376 &[&connector_name],
377 )
378 .await?;
379 rows.iter().map(row_to_run).collect()
380 }
381}
382
383fn status_to_str(status: IndexingRunStatus) -> &'static str {
384 match status {
385 IndexingRunStatus::Running => "running",
386 IndexingRunStatus::Succeeded => "succeeded",
387 IndexingRunStatus::Failed => "failed",
388 }
389}
390
391fn status_from_str(s: &str) -> Result<IndexingRunStatus> {
392 Ok(match s {
393 "running" => IndexingRunStatus::Running,
394 "succeeded" => IndexingRunStatus::Succeeded,
395 "failed" => IndexingRunStatus::Failed,
396 other => return Err(anyhow!("unknown indexing run status '{other}'")),
397 })
398}
399
400fn row_to_run(row: &tokio_postgres::Row) -> Result<IndexingRun> {
401 let status = status_from_str(row.get::<_, String>("status").as_str())?;
402 let documents_seen: i64 = row.get("documents_seen");
403 let chunks_indexed: i64 = row.get("chunks_indexed");
404 let documents_skipped: i64 = row.get("documents_skipped");
405 Ok(IndexingRun {
406 id: row.get("id"),
407 connector_name: row.get("connector_name"),
408 status,
409 started_at: row.get("started_at"),
410 finished_at: row.get("finished_at"),
411 documents_seen: usize::try_from(documents_seen).unwrap_or(0),
412 chunks_indexed: usize::try_from(chunks_indexed).unwrap_or(0),
413 documents_skipped: usize::try_from(documents_skipped).unwrap_or(0),
414 cursor: row.get("cursor"),
415 error: row.get("error"),
416 })
417}
418
419impl IndexingStore for PgIndexingStore {
420 fn record_run(&self, run: &IndexingRun) {
421 let this = self.clone();
422 let run = run.clone();
423 let _ = run_blocking(
424 &self.handle,
425 async move { this.record_run_async(run).await },
426 );
427 }
428
429 fn latest_cursor(&self, connector_name: &str) -> Option<Timestamp> {
430 let this = self.clone();
431 let name = connector_name.to_string();
432 run_blocking(
433 &self.handle,
434 async move { this.latest_cursor_async(name).await },
435 )
436 .ok()
437 .flatten()
438 }
439
440 fn list_runs(&self, connector_name: &str) -> Vec<IndexingRun> {
441 let this = self.clone();
442 let name = connector_name.to_string();
443 run_blocking(
444 &self.handle,
445 async move { this.list_runs_async(name).await },
446 )
447 .unwrap_or_default()
448 }
449}