systemprompt_database/repository/service/
repo.rs1use std::sync::Arc;
4
5use sqlx::PgPool;
6
7use super::model::{CreateServiceInput, ServiceConfig};
8use crate::DbPool;
9use crate::error::DatabaseResult;
10
11#[derive(Debug, Clone)]
12pub struct ServiceRepository {
13 pool: Arc<PgPool>,
14 write_pool: Arc<PgPool>,
15}
16
17impl ServiceRepository {
18 pub fn new(db: &DbPool) -> DatabaseResult<Self> {
19 let pool = db.pool_arc()?;
20 let write_pool = db.write_pool_arc()?;
21 Ok(Self { pool, write_pool })
22 }
23
24 pub async fn get_service_by_name(&self, name: &str) -> DatabaseResult<Option<ServiceConfig>> {
25 let row = sqlx::query!(
26 r#"
27 SELECT name, module_name, status, pid, port, binary_mtime,
28 created_at::text as "created_at!", updated_at::text as "updated_at!"
29 FROM services
30 WHERE name = $1
31 "#,
32 name
33 )
34 .fetch_optional(&*self.pool)
35 .await?;
36
37 Ok(row.map(|r| ServiceConfig {
38 name: r.name,
39 module_name: r.module_name,
40 status: r.status,
41 pid: r.pid,
42 port: r.port,
43 binary_mtime: r.binary_mtime,
44 created_at: r.created_at,
45 updated_at: r.updated_at,
46 }))
47 }
48
49 pub async fn get_all_agent_service_names(&self) -> DatabaseResult<Vec<String>> {
50 let rows = sqlx::query!(r#"SELECT name FROM services WHERE module_name = 'agent'"#)
51 .fetch_all(&*self.pool)
52 .await?;
53 Ok(rows.into_iter().map(|r| r.name).collect())
54 }
55
56 pub async fn get_mcp_services(&self) -> DatabaseResult<Vec<ServiceConfig>> {
57 let rows = sqlx::query!(
58 r#"
59 SELECT name, module_name, status, pid, port, binary_mtime,
60 created_at::text as "created_at!", updated_at::text as "updated_at!"
61 FROM services
62 WHERE module_name = 'mcp'
63 ORDER BY name
64 "#
65 )
66 .fetch_all(&*self.pool)
67 .await?;
68 Ok(rows
69 .into_iter()
70 .map(|r| ServiceConfig {
71 name: r.name,
72 module_name: r.module_name,
73 status: r.status,
74 pid: r.pid,
75 port: r.port,
76 binary_mtime: r.binary_mtime,
77 created_at: r.created_at,
78 updated_at: r.updated_at,
79 })
80 .collect())
81 }
82
83 pub async fn create_service(&self, input: CreateServiceInput<'_>) -> DatabaseResult<()> {
84 let port_i32 = i32::from(input.port);
85 sqlx::query!(
86 r#"
87 INSERT INTO services (name, module_name, status, port, binary_mtime)
88 VALUES ($1, $2, $3, $4, $5)
89 ON CONFLICT (name) DO UPDATE SET
90 module_name = EXCLUDED.module_name,
91 status = EXCLUDED.status,
92 port = EXCLUDED.port,
93 binary_mtime = EXCLUDED.binary_mtime,
94 updated_at = CURRENT_TIMESTAMP
95 "#,
96 input.name,
97 input.module_name,
98 input.status,
99 port_i32,
100 input.binary_mtime
101 )
102 .execute(&*self.write_pool)
103 .await?;
104 Ok(())
105 }
106
107 pub async fn update_service_status(
108 &self,
109 service_name: &str,
110 status: &str,
111 ) -> DatabaseResult<()> {
112 sqlx::query!(
113 r#"UPDATE services SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2"#,
114 status,
115 service_name
116 )
117 .execute(&*self.write_pool)
118 .await?;
119 Ok(())
120 }
121
122 pub async fn delete_service(&self, service_name: &str) -> DatabaseResult<()> {
123 sqlx::query!(r#"DELETE FROM services WHERE name = $1"#, service_name)
124 .execute(&*self.write_pool)
125 .await?;
126 Ok(())
127 }
128
129 pub async fn update_service_pid(&self, service_name: &str, pid: i32) -> DatabaseResult<()> {
130 sqlx::query!(
131 r#"UPDATE services SET pid = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2"#,
132 pid,
133 service_name
134 )
135 .execute(&*self.write_pool)
136 .await?;
137 Ok(())
138 }
139
140 pub async fn clear_service_pid(&self, service_name: &str) -> DatabaseResult<()> {
141 sqlx::query!(
142 r#"UPDATE services SET pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
143 service_name
144 )
145 .execute(&*self.write_pool)
146 .await?;
147 Ok(())
148 }
149
150 pub async fn get_all_running_services(&self) -> DatabaseResult<Vec<ServiceConfig>> {
151 let rows = sqlx::query!(
152 r#"
153 SELECT name, module_name, status, pid, port, binary_mtime,
154 created_at::text as "created_at!", updated_at::text as "updated_at!"
155 FROM services
156 WHERE status = 'running'
157 ORDER BY name
158 "#
159 )
160 .fetch_all(&*self.pool)
161 .await?;
162 Ok(rows
163 .into_iter()
164 .map(|r| ServiceConfig {
165 name: r.name,
166 module_name: r.module_name,
167 status: r.status,
168 pid: r.pid,
169 port: r.port,
170 binary_mtime: r.binary_mtime,
171 created_at: r.created_at,
172 updated_at: r.updated_at,
173 })
174 .collect())
175 }
176
177 pub async fn count_running_services(&self, module_name: &str) -> DatabaseResult<usize> {
178 let row = sqlx::query!(
179 r#"SELECT COUNT(*) as "count!" FROM services WHERE module_name = $1 AND status = 'running'"#,
180 module_name
181 )
182 .fetch_one(&*self.pool)
183 .await?;
184 Ok(usize::try_from(row.count).unwrap_or(0))
185 }
186
187 pub async fn mark_service_crashed(&self, service_name: &str) -> DatabaseResult<()> {
188 sqlx::query!(
189 r#"UPDATE services SET status = 'error', pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
190 service_name
191 )
192 .execute(&*self.write_pool)
193 .await?;
194 Ok(())
195 }
196
197 pub async fn update_service_stopped(&self, service_name: &str) -> DatabaseResult<()> {
198 sqlx::query!(
199 r#"UPDATE services SET status = 'stopped', pid = NULL, updated_at = CURRENT_TIMESTAMP WHERE name = $1"#,
200 service_name
201 )
202 .execute(&*self.write_pool)
203 .await?;
204 Ok(())
205 }
206
207 pub async fn get_running_services_with_pid(&self) -> DatabaseResult<Vec<ServiceConfig>> {
208 self.get_all_running_services().await
209 }
210
211 pub async fn get_services_by_type(
212 &self,
213 module_name: &str,
214 ) -> DatabaseResult<Vec<ServiceConfig>> {
215 let rows = sqlx::query!(
216 r#"
217 SELECT name, module_name, status, pid, port, binary_mtime,
218 created_at::text as "created_at!", updated_at::text as "updated_at!"
219 FROM services
220 WHERE module_name = $1
221 ORDER BY name
222 "#,
223 module_name
224 )
225 .fetch_all(&*self.pool)
226 .await?;
227 Ok(rows
228 .into_iter()
229 .map(|r| ServiceConfig {
230 name: r.name,
231 module_name: r.module_name,
232 status: r.status,
233 pid: r.pid,
234 port: r.port,
235 binary_mtime: r.binary_mtime,
236 created_at: r.created_at,
237 updated_at: r.updated_at,
238 })
239 .collect())
240 }
241
242 pub async fn cleanup_stale_entries(&self) -> DatabaseResult<u64> {
243 let result = sqlx::query!(
244 r#"
245 DELETE FROM services
246 WHERE status IN ('error', 'crashed')
247 OR (status = 'running' AND pid IS NULL)
248 "#
249 )
250 .execute(&*self.write_pool)
251 .await?;
252 Ok(result.rows_affected())
253 }
254}