1mod request;
2mod stats;
3
4pub use request::InsertFileRequest;
5pub use stats::FileStats;
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use sqlx::PgPool;
11use systemprompt_database::DbPool;
12use systemprompt_identifiers::{ContextId, FileId, SessionId, TraceId, UserId};
13
14use crate::error::{FilesError, FilesResult};
15use crate::models::{File, FileMetadata};
16
17#[derive(Debug, Clone)]
18pub struct FileRepository {
19 pub(crate) pool: Arc<PgPool>,
20 write_pool: Arc<PgPool>,
21}
22
23impl FileRepository {
24 pub fn new(db: &DbPool) -> FilesResult<Self> {
25 let pool = db.pool_arc()?;
26 let write_pool = db.write_pool_arc()?;
27 Ok(Self { pool, write_pool })
28 }
29
30 pub async fn insert(&self, request: InsertFileRequest) -> FilesResult<FileId> {
31 let id_uuid = uuid::Uuid::parse_str(request.id.as_str()).map_err(|e| {
32 FilesError::Validation(format!(
33 "Invalid UUID for file id {}: {e}",
34 request.id.as_str()
35 ))
36 })?;
37 let now = Utc::now();
38
39 let user_id_str = request.user_id.as_ref().map(UserId::as_str);
40 let session_id_str = request.session_id.as_ref().map(SessionId::as_str);
41 let trace_id_str = request.trace_id.as_ref().map(TraceId::as_str);
42 let context_id_str = request.context_id.as_ref().map(ContextId::as_str);
43
44 sqlx::query_as!(
45 File,
46 r#"
47 INSERT INTO files (id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id, session_id, trace_id, context_id, created_at, updated_at)
48 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $12)
49 ON CONFLICT (path) DO UPDATE SET
50 public_url = EXCLUDED.public_url,
51 mime_type = EXCLUDED.mime_type,
52 size_bytes = EXCLUDED.size_bytes,
53 ai_content = EXCLUDED.ai_content,
54 metadata = EXCLUDED.metadata,
55 updated_at = EXCLUDED.updated_at
56 RETURNING id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
57 "#,
58 id_uuid,
59 request.path,
60 request.public_url,
61 request.mime_type,
62 request.size_bytes,
63 request.ai_content,
64 request.metadata,
65 user_id_str,
66 session_id_str,
67 trace_id_str,
68 context_id_str,
69 now
70 )
71 .fetch_one(&*self.write_pool)
72 .await?;
73
74 Ok(request.id)
75 }
76
77 pub async fn insert_file(&self, file: &File) -> FilesResult<FileId> {
78 let file_id = FileId::new(file.id.to_string());
79
80 let mut request = InsertFileRequest::new(
81 file_id.clone(),
82 file.path.clone(),
83 file.public_url.clone(),
84 file.mime_type.clone(),
85 )
86 .with_ai_content(file.ai_content)
87 .with_metadata(file.metadata.clone());
88
89 if let Some(size) = file.size_bytes {
90 request = request.with_size(size);
91 }
92
93 if let Some(ref user_id) = file.user_id {
94 request = request.with_user_id(user_id.clone());
95 }
96
97 if let Some(ref session_id) = file.session_id {
98 request = request.with_session_id(session_id.clone());
99 }
100
101 if let Some(ref trace_id) = file.trace_id {
102 request = request.with_trace_id(trace_id.clone());
103 }
104
105 if let Some(ref context_id) = file.context_id {
106 request = request.with_context_id(context_id.clone());
107 }
108
109 self.insert(request).await
110 }
111
112 pub async fn find_by_id(&self, id: &FileId) -> FilesResult<Option<File>> {
113 let id_uuid = uuid::Uuid::parse_str(id.as_str())
114 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
115
116 let result = sqlx::query_as!(
117 File,
118 r#"
119 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
120 FROM files
121 WHERE id = $1 AND deleted_at IS NULL
122 "#,
123 id_uuid
124 )
125 .fetch_optional(&*self.pool)
126 .await?;
127
128 Ok(result)
129 }
130
131 pub async fn find_by_path(&self, path: &str) -> FilesResult<Option<File>> {
132 let result = sqlx::query_as!(
133 File,
134 r#"
135 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
136 FROM files
137 WHERE path = $1 AND deleted_at IS NULL
138 "#,
139 path
140 )
141 .fetch_optional(&*self.pool)
142 .await?;
143
144 Ok(result)
145 }
146
147 pub async fn list_by_user(
148 &self,
149 user_id: &UserId,
150 limit: i64,
151 offset: i64,
152 ) -> FilesResult<Vec<File>> {
153 let user_id_str = user_id.as_str();
154 let result = sqlx::query_as!(
155 File,
156 r#"
157 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
158 FROM files
159 WHERE user_id = $1 AND deleted_at IS NULL
160 ORDER BY created_at DESC
161 LIMIT $2 OFFSET $3
162 "#,
163 user_id_str,
164 limit,
165 offset
166 )
167 .fetch_all(&*self.pool)
168 .await?;
169
170 Ok(result)
171 }
172
173 pub async fn list_all(&self, limit: i64, offset: i64) -> FilesResult<Vec<File>> {
174 let result = sqlx::query_as!(
175 File,
176 r#"
177 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
178 FROM files
179 WHERE deleted_at IS NULL
180 ORDER BY created_at DESC
181 LIMIT $1 OFFSET $2
182 "#,
183 limit,
184 offset
185 )
186 .fetch_all(&*self.pool)
187 .await?;
188
189 Ok(result)
190 }
191
192 pub async fn delete(&self, id: &FileId) -> FilesResult<()> {
193 let id_uuid = uuid::Uuid::parse_str(id.as_str())
194 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
195
196 sqlx::query!(
197 r#"
198 DELETE FROM files
199 WHERE id = $1
200 "#,
201 id_uuid
202 )
203 .execute(&*self.write_pool)
204 .await?;
205
206 Ok(())
207 }
208
209 pub async fn update_metadata(&self, id: &FileId, metadata: &FileMetadata) -> FilesResult<()> {
210 let id_uuid = uuid::Uuid::parse_str(id.as_str())
211 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
212 let metadata_json = serde_json::to_value(metadata)
213 .map_err(|e| FilesError::Validation(format!("Failed to serialize metadata: {e}")))?;
214 let now = Utc::now();
215
216 sqlx::query!(
217 r#"
218 UPDATE files
219 SET metadata = $1, updated_at = $2
220 WHERE id = $3
221 "#,
222 metadata_json,
223 now,
224 id_uuid
225 )
226 .execute(&*self.write_pool)
227 .await?;
228
229 Ok(())
230 }
231
232 pub async fn search_by_path(&self, query: &str, limit: i64) -> FilesResult<Vec<File>> {
233 let pattern = format!("%{query}%");
234 let result = sqlx::query_as!(
235 File,
236 r#"
237 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata,
238 user_id as "user_id: UserId", session_id as "session_id: SessionId",
239 trace_id as "trace_id: TraceId", context_id as "context_id: ContextId",
240 created_at, updated_at, deleted_at
241 FROM files
242 WHERE path ILIKE $1 AND deleted_at IS NULL
243 ORDER BY created_at DESC
244 LIMIT $2
245 "#,
246 pattern,
247 limit
248 )
249 .fetch_all(&*self.pool)
250 .await?;
251
252 Ok(result)
253 }
254}