1mod request;
9mod stats;
10
11pub use request::InsertFileRequest;
12pub use stats::FileStats;
13
14use std::sync::Arc;
15
16use chrono::Utc;
17use sqlx::PgPool;
18use systemprompt_database::DbPool;
19use systemprompt_identifiers::{ContextId, FileId, SessionId, TraceId, UserId};
20
21use crate::error::{FilesError, FilesResult};
22use crate::models::{File, FileMetadata};
23
24#[derive(Debug, Clone)]
25pub struct FileRepository {
26 pub(crate) pool: Arc<PgPool>,
27 write_pool: Arc<PgPool>,
28}
29
30impl FileRepository {
31 pub fn new(db: &DbPool) -> FilesResult<Self> {
32 let pool = db.pool_arc()?;
33 let write_pool = db.write_pool_arc()?;
34 Ok(Self { pool, write_pool })
35 }
36
37 pub async fn insert(&self, request: InsertFileRequest) -> FilesResult<FileId> {
38 let id_uuid = uuid::Uuid::parse_str(request.id.as_str()).map_err(|e| {
39 FilesError::Validation(format!(
40 "Invalid UUID for file id {}: {e}",
41 request.id.as_str()
42 ))
43 })?;
44 let now = Utc::now();
45
46 let user_id_str = request.user_id.as_ref().map(UserId::as_str);
47 let session_id_str = request.session_id.as_ref().map(SessionId::as_str);
48 let trace_id_str = request.trace_id.as_ref().map(TraceId::as_str);
49 let context_id_str = request.context_id.as_ref().map(ContextId::as_str);
50
51 sqlx::query_as!(
52 File,
53 r#"
54 INSERT INTO files (id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id, session_id, trace_id, context_id, created_at, updated_at)
55 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $12)
56 ON CONFLICT (path) DO UPDATE SET
57 public_url = EXCLUDED.public_url,
58 mime_type = EXCLUDED.mime_type,
59 size_bytes = EXCLUDED.size_bytes,
60 ai_content = EXCLUDED.ai_content,
61 metadata = EXCLUDED.metadata,
62 updated_at = EXCLUDED.updated_at
63 RETURNING id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
64 "#,
65 id_uuid,
66 request.path,
67 request.public_url,
68 request.mime_type,
69 request.size_bytes,
70 request.ai_content,
71 request.metadata,
72 user_id_str,
73 session_id_str,
74 trace_id_str,
75 context_id_str,
76 now
77 )
78 .fetch_one(&*self.write_pool)
79 .await?;
80
81 Ok(request.id)
82 }
83
84 pub async fn insert_file(&self, file: &File) -> FilesResult<FileId> {
85 let file_id = FileId::new(file.id.to_string());
86
87 let mut request = InsertFileRequest::new(
88 file_id.clone(),
89 file.path.clone(),
90 file.public_url.clone(),
91 file.mime_type.clone(),
92 )
93 .with_ai_content(file.ai_content)
94 .with_metadata(file.metadata.clone());
95
96 if let Some(size) = file.size_bytes {
97 request = request.with_size(size);
98 }
99
100 if let Some(ref user_id) = file.user_id {
101 request = request.with_user_id(user_id.clone());
102 }
103
104 if let Some(ref session_id) = file.session_id {
105 request = request.with_session_id(session_id.clone());
106 }
107
108 if let Some(ref trace_id) = file.trace_id {
109 request = request.with_trace_id(trace_id.clone());
110 }
111
112 if let Some(ref context_id) = file.context_id {
113 request = request.with_context_id(context_id.clone());
114 }
115
116 self.insert(request).await
117 }
118
119 pub async fn find_by_id(&self, id: &FileId) -> FilesResult<Option<File>> {
120 let id_uuid = uuid::Uuid::parse_str(id.as_str())
121 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
122
123 let result = sqlx::query_as!(
124 File,
125 r#"
126 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
127 FROM files
128 WHERE id = $1 AND deleted_at IS NULL
129 "#,
130 id_uuid
131 )
132 .fetch_optional(&*self.pool)
133 .await?;
134
135 Ok(result)
136 }
137
138 pub async fn find_by_path(&self, path: &str) -> FilesResult<Option<File>> {
139 let result = sqlx::query_as!(
140 File,
141 r#"
142 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
143 FROM files
144 WHERE path = $1 AND deleted_at IS NULL
145 "#,
146 path
147 )
148 .fetch_optional(&*self.pool)
149 .await?;
150
151 Ok(result)
152 }
153
154 pub async fn list_by_user(
155 &self,
156 user_id: &UserId,
157 limit: i64,
158 offset: i64,
159 ) -> FilesResult<Vec<File>> {
160 let user_id_str = user_id.as_str();
161 let result = sqlx::query_as!(
162 File,
163 r#"
164 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
165 FROM files
166 WHERE user_id = $1 AND deleted_at IS NULL
167 ORDER BY created_at DESC
168 LIMIT $2 OFFSET $3
169 "#,
170 user_id_str,
171 limit,
172 offset
173 )
174 .fetch_all(&*self.pool)
175 .await?;
176
177 Ok(result)
178 }
179
180 pub async fn list_all(&self, limit: i64, offset: i64) -> FilesResult<Vec<File>> {
181 let result = sqlx::query_as!(
182 File,
183 r#"
184 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata, user_id as "user_id: UserId", session_id as "session_id: SessionId", trace_id as "trace_id: TraceId", context_id as "context_id: ContextId", created_at, updated_at, deleted_at
185 FROM files
186 WHERE deleted_at IS NULL
187 ORDER BY created_at DESC
188 LIMIT $1 OFFSET $2
189 "#,
190 limit,
191 offset
192 )
193 .fetch_all(&*self.pool)
194 .await?;
195
196 Ok(result)
197 }
198
199 pub async fn delete(&self, id: &FileId) -> FilesResult<()> {
200 let id_uuid = uuid::Uuid::parse_str(id.as_str())
201 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
202
203 sqlx::query!(
204 r#"
205 DELETE FROM files
206 WHERE id = $1
207 "#,
208 id_uuid
209 )
210 .execute(&*self.write_pool)
211 .await?;
212
213 Ok(())
214 }
215
216 pub async fn update_metadata(&self, id: &FileId, metadata: &FileMetadata) -> FilesResult<()> {
217 let id_uuid = uuid::Uuid::parse_str(id.as_str())
218 .map_err(|e| FilesError::Validation(format!("Invalid UUID for file id: {e}")))?;
219 let metadata_json = serde_json::to_value(metadata)
220 .map_err(|e| FilesError::Validation(format!("Failed to serialize metadata: {e}")))?;
221 let now = Utc::now();
222
223 sqlx::query!(
224 r#"
225 UPDATE files
226 SET metadata = $1, updated_at = $2
227 WHERE id = $3
228 "#,
229 metadata_json,
230 now,
231 id_uuid
232 )
233 .execute(&*self.write_pool)
234 .await?;
235
236 Ok(())
237 }
238
239 pub async fn search_by_path(&self, query: &str, limit: i64) -> FilesResult<Vec<File>> {
240 let pattern = format!("%{query}%");
241 let result = sqlx::query_as!(
242 File,
243 r#"
244 SELECT id, path, public_url, mime_type, size_bytes, ai_content, metadata,
245 user_id as "user_id: UserId", session_id as "session_id: SessionId",
246 trace_id as "trace_id: TraceId", context_id as "context_id: ContextId",
247 created_at, updated_at, deleted_at
248 FROM files
249 WHERE path ILIKE $1 AND deleted_at IS NULL
250 ORDER BY created_at DESC
251 LIMIT $2
252 "#,
253 pattern,
254 limit
255 )
256 .fetch_all(&*self.pool)
257 .await?;
258
259 Ok(result)
260 }
261}