Skip to main content

systemprompt_database/repository/service/
repo.rs

1//! Async repository over the `services` registry table.
2
3use std::sync::Arc;
4
5use sqlx::PgPool;
6
7use super::model::{CreateServiceInput, ServiceConfig};
8use crate::DbPool;
9use crate::error::DatabaseResult;
10
11#[derive(Debug, Clone)]
12pub struct ServiceRepository {
13    pool: Arc<PgPool>,
14    write_pool: Arc<PgPool>,
15}
16
17impl ServiceRepository {
18    pub fn new(db: &DbPool) -> DatabaseResult<Self> {
19        let pool = db.pool_arc()?;
20        let write_pool = db.write_pool_arc()?;
21        Ok(Self { pool, write_pool })
22    }
23
24    pub async fn get_service_by_name(&self, name: &str) -> DatabaseResult<Option<ServiceConfig>> {
25        let row = sqlx::query!(
26            r#"
27            SELECT name, module_name, status, pid, port, binary_mtime,
28                   created_at::text as "created_at!", updated_at::text as "updated_at!"
29            FROM services
30            WHERE name = $1
31            "#,
32            name
33        )
34        .fetch_optional(&*self.pool)
35        .await?;
36
37        Ok(row.map(|r| ServiceConfig {
38            name: r.name,
39            module_name: r.module_name,
40            status: r.status,
41            pid: r.pid,
42            port: r.port,
43            binary_mtime: r.binary_mtime,
44            created_at: r.created_at,
45            updated_at: r.updated_at,
46        }))
47    }
48
49    pub async fn get_all_agent_service_names(&self) -> DatabaseResult<Vec<String>> {
50        let rows = sqlx::query!(r#"SELECT name FROM services WHERE module_name = 'agent'"#)
51            .fetch_all(&*self.pool)
52            .await?;
53        Ok(rows.into_iter().map(|r| r.name).collect())
54    }
55
56    pub async fn get_mcp_services(&self) -> DatabaseResult<Vec<ServiceConfig>> {
57        let rows = sqlx::query!(
58            r#"
59            SELECT name, module_name, status, pid, port, binary_mtime,
60                   created_at::text as "created_at!", updated_at::text as "updated_at!"
61            FROM services
62            WHERE module_name = 'mcp'
63            ORDER BY name
64            "#
65        )
66        .fetch_all(&*self.pool)
67        .await?;
68        Ok(rows
69            .into_iter()
70            .map(|r| ServiceConfig {
71                name: r.name,
72                module_name: r.module_name,
73                status: r.status,
74                pid: r.pid,
75                port: r.port,
76                binary_mtime: r.binary_mtime,
77                created_at: r.created_at,
78                updated_at: r.updated_at,
79            })
80            .collect())
81    }
82
83    pub async fn create_service(&self, input: CreateServiceInput<'_>) -> DatabaseResult<()> {
84        let port_i32 = i32::from(input.port);
85        sqlx::query!(
86            r#"
87            INSERT INTO services (name, module_name, status, port, binary_mtime)
88            VALUES ($1, $2, $3, $4, $5)
89            ON CONFLICT (name) DO UPDATE SET
90              module_name = EXCLUDED.module_name,
91              status = EXCLUDED.status,
92              port = EXCLUDED.port,
93              binary_mtime = EXCLUDED.binary_mtime,
94              updated_at = CURRENT_TIMESTAMP
95            "#,
96            input.name,
97            input.module_name,
98            input.status,
99            port_i32,
100            input.binary_mtime
101        )
102        .execute(&*self.write_pool)
103        .await?;
104        Ok(())
105    }
106
107    pub async fn update_service_status(
108        &self,
109        service_name: &str,
110        status: &str,
111    ) -> DatabaseResult<()> {
112        sqlx::query!(
113            r#"UPDATE services SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2"#,
114            status,
115            service_name
116        )
117        .execute(&*self.write_pool)
118        .await?;
119        Ok(())
120    }
121
122    pub async fn delete_service(&self, service_name: &str) -> DatabaseResult<()> {
123        sqlx::query!(r#"DELETE FROM services WHERE name = $1"#, service_name)
124            .execute(&*self.write_pool)
125            .await?;
126        Ok(())
127    }
128
129    pub async fn update_service_pid(&self, service_name: &str, pid: i32) -> DatabaseResult<()> {
130        sqlx::query!(
131            r#"UPDATE services SET pid = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2"#,
132            pid,
133            service_name
134        )
135        .execute(&*self.write_pool)
136        .await?;
137        Ok(())
138    }
139
140    pub async fn clear_service_pid(&self, service_name: &str) -> DatabaseResult<()> {
141        sqlx::query!(
142            r#"UPDATE services SET pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
143            service_name
144        )
145        .execute(&*self.write_pool)
146        .await?;
147        Ok(())
148    }
149
150    pub async fn get_all_running_services(&self) -> DatabaseResult<Vec<ServiceConfig>> {
151        let rows = sqlx::query!(
152            r#"
153            SELECT name, module_name, status, pid, port, binary_mtime,
154                   created_at::text as "created_at!", updated_at::text as "updated_at!"
155            FROM services
156            WHERE status = 'running'
157            ORDER BY name
158            "#
159        )
160        .fetch_all(&*self.pool)
161        .await?;
162        Ok(rows
163            .into_iter()
164            .map(|r| ServiceConfig {
165                name: r.name,
166                module_name: r.module_name,
167                status: r.status,
168                pid: r.pid,
169                port: r.port,
170                binary_mtime: r.binary_mtime,
171                created_at: r.created_at,
172                updated_at: r.updated_at,
173            })
174            .collect())
175    }
176
177    pub async fn count_running_services(&self, module_name: &str) -> DatabaseResult<usize> {
178        let row = sqlx::query!(
179            r#"SELECT COUNT(*) as "count!" FROM services WHERE module_name = $1 AND status = 'running'"#,
180            module_name
181        )
182        .fetch_one(&*self.pool)
183        .await?;
184        Ok(usize::try_from(row.count).unwrap_or(0))
185    }
186
187    pub async fn mark_service_crashed(&self, service_name: &str) -> DatabaseResult<()> {
188        sqlx::query!(
189            r#"UPDATE services SET status = 'error', pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
190            service_name
191        )
192        .execute(&*self.write_pool)
193        .await?;
194        Ok(())
195    }
196
197    pub async fn update_service_stopped(&self, service_name: &str) -> DatabaseResult<()> {
198        sqlx::query!(
199            r#"UPDATE services SET status = 'stopped', pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
200            service_name
201        )
202        .execute(&*self.write_pool)
203        .await?;
204        Ok(())
205    }
206
207    pub async fn get_running_services_with_pid(&self) -> DatabaseResult<Vec<ServiceConfig>> {
208        self.get_all_running_services().await
209    }
210
211    pub async fn get_services_by_type(
212        &self,
213        module_name: &str,
214    ) -> DatabaseResult<Vec<ServiceConfig>> {
215        let rows = sqlx::query!(
216            r#"
217            SELECT name, module_name, status, pid, port, binary_mtime,
218                   created_at::text as "created_at!", updated_at::text as "updated_at!"
219            FROM services
220            WHERE module_name = $1
221            ORDER BY name
222            "#,
223            module_name
224        )
225        .fetch_all(&*self.pool)
226        .await?;
227        Ok(rows
228            .into_iter()
229            .map(|r| ServiceConfig {
230                name: r.name,
231                module_name: r.module_name,
232                status: r.status,
233                pid: r.pid,
234                port: r.port,
235                binary_mtime: r.binary_mtime,
236                created_at: r.created_at,
237                updated_at: r.updated_at,
238            })
239            .collect())
240    }
241
242    pub async fn cleanup_stale_entries(&self) -> DatabaseResult<u64> {
243        let result = sqlx::query!(
244            r#"
245            DELETE FROM services
246            WHERE status IN ('error', 'crashed')
247               OR (status = 'running' AND pid IS NULL)
248            "#
249        )
250        .execute(&*self.write_pool)
251        .await?;
252        Ok(result.rows_affected())
253    }
254}