1use kagi_sync::domain::project_state::ProjectFile;
2use sha2::Digest;
3use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
4use sqlx::{Pool, Row, Sqlite};
5use std::path::Path;
6use std::time::Duration;
7
8pub struct SqliteRemoteRepository {
9 pool: Pool<Sqlite>,
10}
11
12pub struct PushProjectStateRequest<'a> {
13 pub project_id: &'a str,
14 pub base_revision: i64,
15 pub kagi_json: &'a str,
16 pub access_json: &'a str,
17 pub files: &'a [ProjectFile],
18 pub activate_tokens: &'a [String],
19 pub revoke_tokens: &'a [String],
20 pub accepted_joins: &'a [String],
21 pub manifest_json: Option<&'a str>,
22 pub manifest_signature: Option<&'a str>,
23}
24
25pub struct CreateProjectMemberRequest<'a> {
26 pub project_id: &'a str,
27 pub member_id: &'a str,
28 pub name: &'a str,
29 pub role: &'a str,
30 pub status: &'a str,
31 pub recipient: &'a str,
32 pub claim_secret_hash: &'a str,
33}
34
35pub struct UpsertJoinRequest<'a> {
36 pub project_id: &'a str,
37 pub member_id: &'a str,
38 pub request_token_id: &'a str,
39 pub name: &'a str,
40 pub normalized_name: &'a str,
41 pub recipient: &'a str,
42 pub signing_public_key: &'a str,
43}
44
45pub struct ApproveProjectRequest<'a> {
46 pub project_id: &'a str,
47 pub requester_member_id: &'a str,
48 pub requester_name: &'a str,
49 pub requester_recipient: &'a str,
50 pub claim_secret_hash: &'a str,
51 pub token_id: &'a str,
52 pub token_hash: &'a str,
53 pub caps_json: &'a str,
54 pub wrapped_b64: &'a str,
55}
56
57impl SqliteRemoteRepository {
58 pub async fn new_file(path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
59 let opts = SqliteConnectOptions::new()
60 .filename(path.as_ref())
61 .create_if_missing(true);
62 Self::connect_with(opts).await
63 }
64
65 async fn connect_with(opts: SqliteConnectOptions) -> Result<Self, sqlx::Error> {
66 let pool = SqlitePoolOptions::new()
67 .max_connections(5)
68 .min_connections(1)
69 .acquire_timeout(Duration::from_secs(5))
70 .connect_with(opts)
71 .await?;
72
73 sqlx::query("PRAGMA foreign_keys = ON;")
74 .execute(&pool)
75 .await?;
76 sqlx::query("PRAGMA journal_mode = WAL;")
77 .execute(&pool)
78 .await?;
79 sqlx::query("PRAGMA synchronous = FULL;")
80 .execute(&pool)
81 .await?;
82 sqlx::query("PRAGMA busy_timeout = 5000;")
83 .execute(&pool)
84 .await?;
85
86 sqlx::migrate!("./migrations").run(&pool).await?;
87
88 Ok(Self { pool })
89 }
90
91 pub async fn create_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
92 let now = time::OffsetDateTime::now_utc().to_string();
93 sqlx::query(
94 "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
95 )
96 .bind(project_id)
97 .bind(&now)
98 .bind(&now)
99 .execute(&self.pool)
100 .await?;
101 Ok(())
102 }
103
104 pub async fn create_token(
105 &self,
106 project_id: &str,
107 token_id: &str,
108 token_hash: &str,
109 capabilities_json: &str,
110 member_id: Option<&str>,
111 status: &str,
112 ) -> Result<(), sqlx::Error> {
113 let now = time::OffsetDateTime::now_utc().to_string();
114 sqlx::query(
115 "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at)
116 VALUES (?, ?, ?, ?, ?, ?, ?)"
117 )
118 .bind(project_id)
119 .bind(token_id)
120 .bind(token_hash)
121 .bind(capabilities_json)
122 .bind(member_id)
123 .bind(status)
124 .bind(&now)
125 .execute(&self.pool)
126 .await?;
127 Ok(())
128 }
129
130 pub async fn authenticate_token(
131 &self,
132 project_id: &str,
133 token_hash: &str,
134 ) -> Result<Option<(String, Vec<String>, Option<String>)>, sqlx::Error> {
135 let row = sqlx::query(
136 "SELECT token_id, capabilities_json, member_id FROM project_tokens
137 WHERE project_id = ? AND token_hash = ? AND status = 'active'",
138 )
139 .bind(project_id)
140 .bind(token_hash)
141 .fetch_optional(&self.pool)
142 .await?;
143
144 Ok(row.map(|r| {
145 let token_id: String = r.try_get("token_id").unwrap_or_default();
146 let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
147 let member_id: Option<String> = r.try_get("member_id").ok();
148 let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
149 (token_id, caps, member_id)
150 }))
151 }
152
153 pub async fn push_project_state(
154 &self,
155 request: PushProjectStateRequest<'_>,
156 ) -> Result<i64, sqlx::Error> {
157 let mut tx = self.pool.begin().await?;
158
159 let current_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
160 .bind(request.project_id)
161 .fetch_optional(&mut *tx)
162 .await?;
163 let current_revision = current_row
164 .map(|r| r.try_get::<i64, _>("revision").unwrap_or(0))
165 .unwrap_or(0);
166 if current_revision != request.base_revision {
167 return Err(sqlx::Error::RowNotFound);
168 }
169
170 sqlx::query("DELETE FROM project_files WHERE project_id = ?")
171 .bind(request.project_id)
172 .execute(&mut *tx)
173 .await?;
174
175 let now = time::OffsetDateTime::now_utc().to_string();
176 for file in request.files {
177 sqlx::query(
178 "INSERT INTO project_files (project_id, path, content, sha256, updated_at) VALUES (?, ?, ?, ?, ?)"
179 )
180 .bind(request.project_id)
181 .bind(&file.path)
182 .bind(&file.content)
183 .bind(&file.sha256)
184 .bind(&now)
185 .execute(&mut *tx)
186 .await?;
187 }
188
189 let new_revision = request.base_revision + 1;
190 sqlx::query(
191 "UPDATE projects SET revision = ?, kagi_json = ?, access_json = ?, updated_at = ? WHERE project_id = ?"
192 )
193 .bind(new_revision)
194 .bind(request.kagi_json)
195 .bind(request.access_json)
196 .bind(&now)
197 .bind(request.project_id)
198 .execute(&mut *tx)
199 .await?;
200
201 for token_id in request.activate_tokens {
202 sqlx::query(
203 "UPDATE project_tokens SET status = 'active', activated_at = ? WHERE project_id = ? AND token_id = ? AND status = 'pending_activation'"
204 )
205 .bind(&now)
206 .bind(request.project_id)
207 .bind(token_id)
208 .execute(&mut *tx)
209 .await?;
210 }
211
212 for token_id in request.revoke_tokens {
213 sqlx::query(
214 "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
215 )
216 .bind(&now)
217 .bind(request.project_id)
218 .bind(token_id)
219 .execute(&mut *tx)
220 .await?;
221 }
222
223 for member_id in request.accepted_joins {
224 sqlx::query(
225 "UPDATE join_requests SET status = 'accepted', updated_at = ? WHERE project_id = ? AND member_id = ?"
226 )
227 .bind(&now)
228 .bind(request.project_id)
229 .bind(member_id)
230 .execute(&mut *tx)
231 .await?;
232 }
233
234 if let Some(manifest_json) = request.manifest_json {
235 let manifest_hash = {
236 let mut hasher = sha2::Sha256::new();
237 hasher.update(manifest_json.as_bytes());
238 hex::encode(hasher.finalize())
239 };
240 sqlx::query(
241 "INSERT INTO manifests (project_id, revision, manifest_hash, manifest_json, manifest_signature, created_at) VALUES (?, ?, ?, ?, ?, ?)"
242 )
243 .bind(request.project_id)
244 .bind(new_revision)
245 .bind(&manifest_hash)
246 .bind(manifest_json)
247 .bind(request.manifest_signature)
248 .bind(&now)
249 .execute(&mut *tx)
250 .await?;
251 }
252
253 tx.commit().await?;
254 Ok(new_revision)
255 }
256
257 pub async fn pull_project_state(
258 &self,
259 project_id: &str,
260 ) -> Result<Option<(i64, Vec<ProjectFile>)>, sqlx::Error> {
261 let revision_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
262 .bind(project_id)
263 .fetch_optional(&self.pool)
264 .await?;
265 let revision = match revision_row {
266 Some(r) => r.try_get::<i64, _>("revision")?,
267 None => return Ok(None),
268 };
269
270 let file_rows =
271 sqlx::query("SELECT path, content, sha256 FROM project_files WHERE project_id = ?")
272 .bind(project_id)
273 .fetch_all(&self.pool)
274 .await?;
275
276 let project_files = file_rows
277 .into_iter()
278 .map(|r| ProjectFile {
279 path: r.try_get("path").unwrap_or_default(),
280 content: r.try_get("content").unwrap_or_default(),
281 sha256: r.try_get("sha256").ok(),
282 })
283 .collect();
284
285 Ok(Some((revision, project_files)))
286 }
287
288 pub async fn get_manifest(
289 &self,
290 project_id: &str,
291 revision: i64,
292 ) -> Result<Option<(String, String, Option<String>)>, sqlx::Error> {
293 let row = sqlx::query(
294 "SELECT manifest_hash, manifest_json, manifest_signature FROM manifests WHERE project_id = ? AND revision = ?",
295 )
296 .bind(project_id)
297 .bind(revision)
298 .fetch_optional(&self.pool)
299 .await?;
300
301 Ok(row.map(|r| {
302 (
303 r.try_get("manifest_hash").unwrap_or_default(),
304 r.try_get("manifest_json").unwrap_or_default(),
305 r.try_get("manifest_signature").ok(),
306 )
307 }))
308 }
309
310 pub async fn list_join_requests(
311 &self,
312 project_id: &str,
313 ) -> Result<Vec<(String, String, String, Option<String>, String)>, sqlx::Error> {
314 let rows = sqlx::query(
315 "SELECT member_id, name, recipient, signing_public_key, created_at FROM join_requests
316 WHERE project_id = ? AND status = 'pending'",
317 )
318 .bind(project_id)
319 .fetch_all(&self.pool)
320 .await?;
321
322 Ok(rows
323 .into_iter()
324 .map(|r| {
325 (
326 r.try_get("member_id").unwrap_or_default(),
327 r.try_get("name").unwrap_or_default(),
328 r.try_get("recipient").unwrap_or_default(),
329 r.try_get("signing_public_key").ok(),
330 r.try_get("created_at").unwrap_or_default(),
331 )
332 })
333 .collect())
334 }
335
336 pub async fn upsert_join_request(
337 &self,
338 request: UpsertJoinRequest<'_>,
339 ) -> Result<(), sqlx::Error> {
340 let now = time::OffsetDateTime::now_utc().to_string();
341 sqlx::query(
342 "INSERT INTO join_requests (project_id, member_id, request_token_id, name, normalized_name, recipient, signing_public_key, status, created_at, updated_at)
343 VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)
344 ON CONFLICT(project_id, member_id) DO UPDATE SET
345 request_token_id = excluded.request_token_id,
346 name = excluded.name,
347 normalized_name = excluded.normalized_name,
348 recipient = excluded.recipient,
349 signing_public_key = excluded.signing_public_key,
350 updated_at = excluded.updated_at
351 WHERE join_requests.request_token_id = excluded.request_token_id"
352 )
353 .bind(request.project_id)
354 .bind(request.member_id)
355 .bind(request.request_token_id)
356 .bind(request.name)
357 .bind(request.normalized_name)
358 .bind(request.recipient)
359 .bind(request.signing_public_key)
360 .bind(&now)
361 .bind(&now)
362 .execute(&self.pool)
363 .await?;
364 Ok(())
365 }
366
367 pub async fn revoke_tokens(
368 &self,
369 project_id: &str,
370 token_ids: &[String],
371 ) -> Result<(), sqlx::Error> {
372 let now = time::OffsetDateTime::now_utc().to_string();
373 for token_id in token_ids {
374 sqlx::query(
375 "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
376 )
377 .bind(&now)
378 .bind(project_id)
379 .bind(token_id)
380 .execute(&self.pool)
381 .await?;
382 }
383 Ok(())
384 }
385
386 pub async fn list_project_tokens(
387 &self,
388 project_id: &str,
389 ) -> Result<
390 Vec<(
391 String,
392 String,
393 Option<String>,
394 String,
395 String,
396 Option<String>,
397 Option<String>,
398 )>,
399 sqlx::Error,
400 > {
401 let rows = sqlx::query(
402 "SELECT token_id, capabilities_json, member_id, status, created_at, activated_at, revoked_at FROM project_tokens WHERE project_id = ? ORDER BY created_at DESC"
403 )
404 .bind(project_id)
405 .fetch_all(&self.pool)
406 .await?;
407
408 Ok(rows
409 .into_iter()
410 .map(|r| {
411 (
412 r.try_get("token_id").unwrap_or_default(),
413 r.try_get("capabilities_json").unwrap_or_default(),
414 r.try_get("member_id").ok(),
415 r.try_get("status").unwrap_or_default(),
416 r.try_get("created_at").unwrap_or_default(),
417 r.try_get("activated_at").ok(),
418 r.try_get("revoked_at").ok(),
419 )
420 })
421 .collect())
422 }
423
424 pub async fn get_project_meta(
425 &self,
426 project_id: &str,
427 ) -> Result<Option<(Option<String>, Option<String>)>, sqlx::Error> {
428 let row = sqlx::query("SELECT kagi_json, access_json FROM projects WHERE project_id = ?")
429 .bind(project_id)
430 .fetch_optional(&self.pool)
431 .await?;
432
433 match row {
434 Some(r) => {
435 let k = r.try_get::<Option<String>, _>("kagi_json")?;
436 let a = r.try_get::<Option<String>, _>("access_json")?;
437 Ok(Some((k, a)))
438 }
439 None => Ok(None),
440 }
441 }
442
443 pub async fn has_admin_token(&self) -> Result<bool, sqlx::Error> {
444 let row = sqlx::query("SELECT COUNT(*) as cnt FROM admin_tokens WHERE status = 'active'")
445 .fetch_one(&self.pool)
446 .await?;
447 let count: i64 = row.try_get("cnt").unwrap_or(0);
448 Ok(count > 0)
449 }
450
451 pub async fn create_admin_token(
452 &self,
453 token_id: &str,
454 token_hash: &str,
455 capabilities_json: &str,
456 created_at: &str,
457 ) -> Result<(), sqlx::Error> {
458 sqlx::query(
459 "INSERT INTO admin_tokens (token_id, token_hash, capabilities_json, status, created_at) VALUES (?, ?, ?, 'active', ?)"
460 )
461 .bind(token_id)
462 .bind(token_hash)
463 .bind(capabilities_json)
464 .bind(created_at)
465 .execute(&self.pool)
466 .await?;
467 Ok(())
468 }
469
470 pub async fn authenticate_admin_token(
471 &self,
472 token_hash: &str,
473 ) -> Result<Option<(String, Vec<String>)>, sqlx::Error> {
474 let row = sqlx::query(
475 "SELECT token_id, capabilities_json FROM admin_tokens WHERE token_hash = ? AND status = 'active'"
476 )
477 .bind(token_hash)
478 .fetch_optional(&self.pool)
479 .await?;
480
481 Ok(row.map(|r| {
482 let token_id: String = r.try_get("token_id").unwrap_or_default();
483 let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
484 let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
485 (token_id, caps)
486 }))
487 }
488
489 pub async fn create_project_request(
490 &self,
491 project_id: &str,
492 requester_member_id: &str,
493 requester_name: &str,
494 requester_recipient: &str,
495 claim_secret_hash: &str,
496 kagi_json: Option<&str>,
497 ) -> Result<(), sqlx::Error> {
498 let now = time::OffsetDateTime::now_utc().to_string();
499 sqlx::query(
500 "INSERT INTO project_requests (project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?)"
501 )
502 .bind(project_id)
503 .bind(requester_member_id)
504 .bind(requester_name)
505 .bind(requester_recipient)
506 .bind(claim_secret_hash)
507 .bind(kagi_json)
508 .bind(&now)
509 .bind(&now)
510 .execute(&self.pool)
511 .await?;
512 Ok(())
513 }
514
515 pub async fn list_project_requests(
516 &self,
517 ) -> Result<
518 Vec<(
519 String,
520 String,
521 String,
522 String,
523 String,
524 Option<String>,
525 String,
526 )>,
527 sqlx::Error,
528 > {
529 let rows = sqlx::query(
530 "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE status = 'pending'"
531 )
532 .fetch_all(&self.pool)
533 .await?;
534
535 Ok(rows
536 .into_iter()
537 .map(|r| {
538 (
539 r.try_get("project_id").unwrap_or_default(),
540 r.try_get("requester_member_id").unwrap_or_default(),
541 r.try_get("requester_name").unwrap_or_default(),
542 r.try_get("requester_recipient").unwrap_or_default(),
543 r.try_get("claim_secret_hash").unwrap_or_default(),
544 r.try_get("kagi_json").ok(),
545 r.try_get("status").unwrap_or_default(),
546 )
547 })
548 .collect())
549 }
550
551 pub async fn get_project_request(
552 &self,
553 project_id: &str,
554 ) -> Result<
555 Option<(
556 String,
557 String,
558 String,
559 String,
560 String,
561 Option<String>,
562 String,
563 )>,
564 sqlx::Error,
565 > {
566 let row = sqlx::query(
567 "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE project_id = ?"
568 )
569 .bind(project_id)
570 .fetch_optional(&self.pool)
571 .await?;
572
573 Ok(row.map(|r| {
574 (
575 r.try_get("project_id").unwrap_or_default(),
576 r.try_get("requester_member_id").unwrap_or_default(),
577 r.try_get("requester_name").unwrap_or_default(),
578 r.try_get("requester_recipient").unwrap_or_default(),
579 r.try_get("claim_secret_hash").unwrap_or_default(),
580 r.try_get("kagi_json").ok(),
581 r.try_get("status").unwrap_or_default(),
582 )
583 }))
584 }
585
586 #[allow(dead_code)]
587 pub async fn delete_project_request(&self, project_id: &str) -> Result<(), sqlx::Error> {
588 sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
589 .bind(project_id)
590 .execute(&self.pool)
591 .await?;
592 Ok(())
593 }
594
595 #[allow(dead_code)]
596 pub async fn create_project_member(
597 &self,
598 req: CreateProjectMemberRequest<'_>,
599 ) -> Result<(), sqlx::Error> {
600 let now = time::OffsetDateTime::now_utc().to_string();
601 sqlx::query(
602 "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
603 )
604 .bind(req.project_id)
605 .bind(req.member_id)
606 .bind(req.name)
607 .bind(req.role)
608 .bind(req.status)
609 .bind(req.recipient)
610 .bind(req.claim_secret_hash)
611 .bind(&now)
612 .bind(&now)
613 .execute(&self.pool)
614 .await?;
615 Ok(())
616 }
617
618 pub async fn get_project_member(
619 &self,
620 project_id: &str,
621 member_id: &str,
622 ) -> Result<Option<(String, String, String, String, String)>, sqlx::Error> {
623 let row = sqlx::query(
624 "SELECT name, role, status, recipient, claim_secret_hash FROM project_members WHERE project_id = ? AND member_id = ?"
625 )
626 .bind(project_id)
627 .bind(member_id)
628 .fetch_optional(&self.pool)
629 .await?;
630 Ok(row.map(|r| {
631 let name: String = r.try_get("name").unwrap_or_default();
632 let role: String = r.try_get("role").unwrap_or_default();
633 let status: String = r.try_get("status").unwrap_or_default();
634 let recipient: String = r.try_get("recipient").unwrap_or_default();
635 let claim_secret_hash: String = r.try_get("claim_secret_hash").unwrap_or_default();
636 (name, role, status, recipient, claim_secret_hash)
637 }))
638 }
639
640 #[allow(dead_code)]
641 pub async fn save_wrapped_project_token(
642 &self,
643 project_id: &str,
644 member_id: &str,
645 wrapped: &str,
646 ) -> Result<(), sqlx::Error> {
647 sqlx::query(
648 "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
649 )
650 .bind(wrapped)
651 .bind(project_id)
652 .bind(member_id)
653 .execute(&self.pool)
654 .await?;
655 Ok(())
656 }
657
658 pub async fn get_wrapped_project_token(
659 &self,
660 project_id: &str,
661 member_id: &str,
662 ) -> Result<Option<String>, sqlx::Error> {
663 let row = sqlx::query(
664 "SELECT wrapped_project_token FROM project_members WHERE project_id = ? AND member_id = ?"
665 )
666 .bind(project_id)
667 .bind(member_id)
668 .fetch_optional(&self.pool)
669 .await?;
670 Ok(row.and_then(|r| r.try_get("wrapped_project_token").ok()))
671 }
672
673 pub async fn get_project_member_role(
674 &self,
675 project_id: &str,
676 member_id: &str,
677 ) -> Result<Option<String>, sqlx::Error> {
678 let row =
679 sqlx::query("SELECT role FROM project_members WHERE project_id = ? AND member_id = ?")
680 .bind(project_id)
681 .bind(member_id)
682 .fetch_optional(&self.pool)
683 .await?;
684
685 Ok(row.map(|r| r.try_get("role").unwrap_or_default()))
686 }
687
688 pub async fn approve_project_request_tx(
689 &self,
690 req: ApproveProjectRequest<'_>,
691 ) -> Result<(), sqlx::Error> {
692 let mut tx = self.pool.begin().await?;
693 let now = time::OffsetDateTime::now_utc().to_string();
694
695 sqlx::query(
696 "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
697 )
698 .bind(req.project_id)
699 .bind(&now)
700 .bind(&now)
701 .execute(&mut *tx)
702 .await?;
703
704 sqlx::query(
705 "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
706 )
707 .bind(req.project_id)
708 .bind(req.requester_member_id)
709 .bind(req.requester_name)
710 .bind("admin")
711 .bind("active")
712 .bind(req.requester_recipient)
713 .bind(req.claim_secret_hash)
714 .bind(&now)
715 .bind(&now)
716 .execute(&mut *tx)
717 .await?;
718
719 sqlx::query(
720 "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at) VALUES (?, ?, ?, ?, ?, 'active', ?)"
721 )
722 .bind(req.project_id)
723 .bind(req.token_id)
724 .bind(req.token_hash)
725 .bind(req.caps_json)
726 .bind(req.requester_member_id)
727 .bind(&now)
728 .execute(&mut *tx)
729 .await?;
730
731 sqlx::query(
732 "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
733 )
734 .bind(req.wrapped_b64)
735 .bind(req.project_id)
736 .bind(req.requester_member_id)
737 .execute(&mut *tx)
738 .await?;
739
740 sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
741 .bind(req.project_id)
742 .execute(&mut *tx)
743 .await?;
744
745 tx.commit().await?;
746 Ok(())
747 }
748
749 pub async fn delete_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
750 sqlx::query("DELETE FROM projects WHERE project_id = ?")
751 .bind(project_id)
752 .execute(&self.pool)
753 .await?;
754 Ok(())
755 }
756
757 pub async fn list_projects(
758 &self,
759 ) -> Result<Vec<(String, i64, Option<String>, String)>, sqlx::Error> {
760 let rows = sqlx::query("SELECT project_id, revision, kagi_json, created_at FROM projects")
761 .fetch_all(&self.pool)
762 .await?;
763
764 Ok(rows
765 .into_iter()
766 .map(|r| {
767 (
768 r.try_get("project_id").unwrap_or_default(),
769 r.try_get("revision").unwrap_or_default(),
770 r.try_get("kagi_json").ok(),
771 r.try_get("created_at").unwrap_or_default(),
772 )
773 })
774 .collect())
775 }
776
777 #[allow(clippy::too_many_arguments)]
778 pub async fn create_audit_event(
779 &self,
780 event_id: &str,
781 project_id: Option<&str>,
782 actor_member_id: Option<&str>,
783 actor_token_id: Option<&str>,
784 event_type: &str,
785 request_id: Option<&str>,
786 remote_addr: Option<&str>,
787 metadata_json: Option<&str>,
788 ) -> Result<(), sqlx::Error> {
789 let now = time::OffsetDateTime::now_utc().to_string();
790 sqlx::query(
791 "INSERT INTO audit_events (event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
792 )
793 .bind(event_id)
794 .bind(&now)
795 .bind(project_id)
796 .bind(actor_member_id)
797 .bind(actor_token_id)
798 .bind(event_type)
799 .bind(request_id)
800 .bind(remote_addr)
801 .bind(metadata_json)
802 .execute(&self.pool)
803 .await?;
804 Ok(())
805 }
806
807 #[allow(dead_code)]
808 pub async fn list_audit_events(
809 &self,
810 project_id: Option<&str>,
811 limit: i64,
812 ) -> Result<
813 Vec<(
814 String,
815 String,
816 Option<String>,
817 Option<String>,
818 Option<String>,
819 String,
820 Option<String>,
821 Option<String>,
822 Option<String>,
823 )>,
824 sqlx::Error,
825 > {
826 let rows = if let Some(pid) = project_id {
827 sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events WHERE project_id = ? ORDER BY created_at DESC LIMIT ?")
828 .bind(pid)
829 .bind(limit)
830 .fetch_all(&self.pool)
831 .await?
832 } else {
833 sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events ORDER BY created_at DESC LIMIT ?")
834 .bind(limit)
835 .fetch_all(&self.pool)
836 .await?
837 };
838
839 Ok(rows
840 .into_iter()
841 .map(|r| {
842 (
843 r.try_get("event_id").unwrap_or_default(),
844 r.try_get("created_at").unwrap_or_default(),
845 r.try_get("project_id").ok(),
846 r.try_get("actor_member_id").ok(),
847 r.try_get("actor_token_id").ok(),
848 r.try_get("event_type").unwrap_or_default(),
849 r.try_get("request_id").ok(),
850 r.try_get("remote_addr").ok(),
851 r.try_get("metadata_json").ok(),
852 )
853 })
854 .collect())
855 }
856
857 pub async fn request_id_seen(
858 &self,
859 project_id: &str,
860 request_id: &str,
861 event_type: &str,
862 ) -> Result<bool, sqlx::Error> {
863 let seen: Option<i64> = sqlx::query_scalar(
864 "SELECT 1 FROM audit_events WHERE project_id = ? AND request_id = ? AND event_type = ? LIMIT 1",
865 )
866 .bind(project_id)
867 .bind(request_id)
868 .bind(event_type)
869 .fetch_optional(&self.pool)
870 .await?;
871
872 Ok(seen.is_some())
873 }
874
875 pub async fn get_metrics(&self) -> Result<(i64, i64, i64, i64), sqlx::Error> {
876 let projects: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM projects")
877 .fetch_one(&self.pool)
878 .await?;
879 let tokens: i64 =
880 sqlx::query_scalar("SELECT COUNT(*) FROM project_tokens WHERE status != 'revoked'")
881 .fetch_one(&self.pool)
882 .await?;
883 let admins: i64 =
884 sqlx::query_scalar("SELECT COUNT(*) FROM admin_tokens WHERE status = 'active'")
885 .fetch_one(&self.pool)
886 .await?;
887 let db_size: i64 = sqlx::query_scalar(
888 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
889 )
890 .fetch_one(&self.pool)
891 .await?;
892 Ok((projects, tokens, admins, db_size))
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899 use kagi_sync::domain::project_state::ProjectFile;
900
901 async fn test_repo() -> SqliteRemoteRepository {
902 let id = rand::random::<u64>();
903 let path = std::env::temp_dir().join(format!("kagi_test_{id}.db"));
904 SqliteRemoteRepository::new_file(path).await.unwrap()
905 }
906
907 #[tokio::test]
908 async fn test_create_project_and_pull() {
909 let repo = test_repo().await;
910 repo.create_project("kgp_test").await.unwrap();
911
912 let result = repo.pull_project_state("kgp_test").await.unwrap();
913 assert!(result.is_some());
914 let (revision, files) = result.unwrap();
915 assert_eq!(revision, 0);
916 assert!(files.is_empty());
917 }
918
919 #[tokio::test]
920 async fn test_create_project_duplicate_fails() {
921 let repo = test_repo().await;
922 repo.create_project("kgp_test").await.unwrap();
923 let err = repo.create_project("kgp_test").await.unwrap_err();
924 assert!(
925 err.as_database_error()
926 .map(|d| d.is_unique_violation())
927 .unwrap_or(false)
928 );
929 }
930
931 #[tokio::test]
932 async fn test_authenticate_token() {
933 let repo = test_repo().await;
934 repo.create_project("kgp_test").await.unwrap();
935 repo.create_token(
936 "kgp_test",
937 "kgt_123",
938 "hash_correct",
939 "[\"pull\"]",
940 Some("kgm_alice"),
941 "active",
942 )
943 .await
944 .unwrap();
945
946 let found = repo
947 .authenticate_token("kgp_test", "hash_correct")
948 .await
949 .unwrap();
950 assert!(found.is_some());
951 let (token_id, caps, member_id) = found.unwrap();
952 assert_eq!(token_id, "kgt_123");
953 assert_eq!(caps, vec!["pull"]);
954 assert_eq!(member_id, Some("kgm_alice".to_string()));
955
956 let not_found = repo
957 .authenticate_token("kgp_test", "hash_wrong")
958 .await
959 .unwrap();
960 assert!(not_found.is_none());
961 }
962
963 #[tokio::test]
964 async fn test_push_and_pull_project_state() {
965 let repo = test_repo().await;
966 repo.create_project("kgp_test").await.unwrap();
967
968 let files = vec![ProjectFile {
969 path: "dev.env".into(),
970 content: "KEY=val".into(),
971 sha256: Some("abc".into()),
972 }];
973 let new_rev = repo
974 .push_project_state(PushProjectStateRequest {
975 project_id: "kgp_test",
976 base_revision: 0,
977 kagi_json: "{}",
978 access_json: "{}",
979 files: &files,
980 activate_tokens: &[],
981 revoke_tokens: &[],
982 accepted_joins: &[],
983 manifest_json: None,
984 manifest_signature: None,
985 })
986 .await
987 .unwrap();
988 assert_eq!(new_rev, 1);
989
990 let result = repo.pull_project_state("kgp_test").await.unwrap();
991 let (revision, pulled_files) = result.unwrap();
992 assert_eq!(revision, 1);
993 assert_eq!(pulled_files.len(), 1);
994 assert_eq!(pulled_files[0].path, "dev.env");
995 assert_eq!(pulled_files[0].content, "KEY=val");
996 assert_eq!(pulled_files[0].sha256, Some("abc".to_string()));
997 }
998
999 #[tokio::test]
1000 async fn test_push_conflict() {
1001 let repo = test_repo().await;
1002 repo.create_project("kgp_test").await.unwrap();
1003
1004 let err = repo
1005 .push_project_state(PushProjectStateRequest {
1006 project_id: "kgp_test",
1007 base_revision: 99,
1008 kagi_json: "{}",
1009 access_json: "{}",
1010 files: &[],
1011 activate_tokens: &[],
1012 revoke_tokens: &[],
1013 accepted_joins: &[],
1014 manifest_json: None,
1015 manifest_signature: None,
1016 })
1017 .await
1018 .unwrap_err();
1019 assert!(matches!(err, sqlx::Error::RowNotFound));
1020 }
1021
1022 #[tokio::test]
1023 async fn test_join_request_flow() {
1024 let repo = test_repo().await;
1025 repo.create_project("kgp_test").await.unwrap();
1026
1027 repo.upsert_join_request(UpsertJoinRequest {
1028 project_id: "kgp_test",
1029 member_id: "kgm_bob",
1030 request_token_id: "kgt_req1",
1031 name: "Bob",
1032 normalized_name: "bob",
1033 recipient: "age1...",
1034 signing_public_key: "signing-key",
1035 })
1036 .await
1037 .unwrap();
1038
1039 let pending = repo.list_join_requests("kgp_test").await.unwrap();
1040 assert_eq!(pending.len(), 1);
1041 assert_eq!(pending[0].0, "kgm_bob");
1042 assert_eq!(pending[0].1, "Bob");
1043 assert_eq!(pending[0].2, "age1...");
1044 assert_eq!(pending[0].3.as_deref(), Some("signing-key"));
1045 }
1046
1047 #[tokio::test]
1048 async fn test_revoke_token() {
1049 let repo = test_repo().await;
1050 repo.create_project("kgp_test").await.unwrap();
1051 repo.create_token(
1052 "kgp_test",
1053 "kgt_123",
1054 "hash_value",
1055 "[\"pull\"]",
1056 None,
1057 "active",
1058 )
1059 .await
1060 .unwrap();
1061
1062 repo.revoke_tokens("kgp_test", &["kgt_123".into()])
1063 .await
1064 .unwrap();
1065
1066 let found = repo
1067 .authenticate_token("kgp_test", "hash_value")
1068 .await
1069 .unwrap();
1070 assert!(found.is_none());
1071 }
1072
1073 #[tokio::test]
1074 async fn test_get_project_meta() {
1075 let repo = test_repo().await;
1076 repo.create_project("kgp_test").await.unwrap();
1077
1078 let files = vec![ProjectFile {
1079 path: "a".into(),
1080 content: "b".into(),
1081 sha256: None,
1082 }];
1083 repo.push_project_state(PushProjectStateRequest {
1084 project_id: "kgp_test",
1085 base_revision: 0,
1086 kagi_json: "{\"k\":1}",
1087 access_json: "{\"a\":2}",
1088 files: &files,
1089 activate_tokens: &[],
1090 revoke_tokens: &[],
1091 accepted_joins: &[],
1092 manifest_json: None,
1093 manifest_signature: None,
1094 })
1095 .await
1096 .unwrap();
1097
1098 let meta = repo.get_project_meta("kgp_test").await.unwrap();
1099 assert!(meta.is_some());
1100 let (kagi_json, access_json) = meta.unwrap();
1101 assert_eq!(kagi_json, Some("{\"k\":1}".to_string()));
1102 assert_eq!(access_json, Some("{\"a\":2}".to_string()));
1103 }
1104
1105 #[tokio::test]
1106 async fn test_admin_token_lifecycle() {
1107 let repo = test_repo().await;
1108 assert!(!repo.has_admin_token().await.unwrap());
1109
1110 let created_at = time::OffsetDateTime::now_utc().to_string();
1111 repo.create_admin_token("kat_123", "hash_admin", "[\"admin\"]", &created_at)
1112 .await
1113 .unwrap();
1114
1115 assert!(repo.has_admin_token().await.unwrap());
1116
1117 let found = repo.authenticate_admin_token("hash_admin").await.unwrap();
1118 assert!(found.is_some());
1119 let (token_id, caps) = found.unwrap();
1120 assert_eq!(token_id, "kat_123");
1121 assert_eq!(caps, vec!["admin"]);
1122 }
1123
1124 #[tokio::test]
1125 async fn test_authenticate_admin_token_wrong_hash() {
1126 let repo = test_repo().await;
1127 let created_at = time::OffsetDateTime::now_utc().to_string();
1128 repo.create_admin_token("kat_123", "hash_correct", "[\"admin\"]", &created_at)
1129 .await
1130 .unwrap();
1131
1132 let not_found = repo.authenticate_admin_token("hash_wrong").await.unwrap();
1133 assert!(not_found.is_none());
1134 }
1135
1136 #[tokio::test]
1137 async fn test_project_request_lifecycle() {
1138 let repo = test_repo().await;
1139 repo.create_project_request(
1140 "kgp_req",
1141 "kgm_alice",
1142 "Alice",
1143 "age1...",
1144 "cs:test",
1145 Some("{\"key\":1}"),
1146 )
1147 .await
1148 .unwrap();
1149
1150 let requests = repo.list_project_requests().await.unwrap();
1151 assert_eq!(requests.len(), 1);
1152 let (project_id, member_id, name, recipient, _hash, kagi_json, status) = &requests[0];
1153 assert_eq!(project_id, "kgp_req");
1154 assert_eq!(member_id, "kgm_alice");
1155 assert_eq!(name, "Alice");
1156 assert_eq!(recipient, "age1...");
1157 assert_eq!(kagi_json.as_deref(), Some("{\"key\":1}"));
1158 assert_eq!(status, "pending");
1159
1160 let single = repo.get_project_request("kgp_req").await.unwrap();
1161 assert!(single.is_some());
1162 let (project_id2, member_id2, name2, recipient2, _hash2, kagi_json2, status2) =
1163 single.unwrap();
1164 assert_eq!(project_id2, "kgp_req");
1165 assert_eq!(member_id2, "kgm_alice");
1166 assert_eq!(name2, "Alice");
1167 assert_eq!(recipient2, "age1...");
1168 assert_eq!(kagi_json2.as_deref(), Some("{\"key\":1}"));
1169 assert_eq!(status2, "pending");
1170
1171 repo.delete_project_request("kgp_req").await.unwrap();
1172 let after_delete = repo.list_project_requests().await.unwrap();
1173 assert!(after_delete.is_empty());
1174 }
1175
1176 #[tokio::test]
1177 async fn test_project_member_lifecycle() {
1178 let repo = test_repo().await;
1179 repo.create_project("kgp_test").await.unwrap();
1180
1181 repo.create_project_member(CreateProjectMemberRequest {
1182 project_id: "kgp_test",
1183 member_id: "kgm_bob",
1184 name: "Bob",
1185 role: "admin",
1186 status: "active",
1187 recipient: "age1...",
1188 claim_secret_hash: "cs:test",
1189 })
1190 .await
1191 .unwrap();
1192
1193 let role = repo
1194 .get_project_member_role("kgp_test", "kgm_bob")
1195 .await
1196 .unwrap();
1197 assert_eq!(role, Some("admin".to_string()));
1198
1199 repo.delete_project("kgp_test").await.unwrap();
1200
1201 let role_after = repo
1202 .get_project_member_role("kgp_test", "kgm_bob")
1203 .await
1204 .unwrap();
1205 assert!(role_after.is_none());
1206 }
1207
1208 #[tokio::test]
1209 async fn test_list_projects() {
1210 let repo = test_repo().await;
1211 repo.create_project("kgp_a").await.unwrap();
1212 repo.create_project("kgp_b").await.unwrap();
1213
1214 let projects = repo.list_projects().await.unwrap();
1215 assert_eq!(projects.len(), 2);
1216 let ids: Vec<String> = projects.iter().map(|p| p.0.clone()).collect();
1217 assert!(ids.contains(&"kgp_a".to_string()));
1218 assert!(ids.contains(&"kgp_b".to_string()));
1219 }
1220
1221 #[tokio::test]
1222 async fn test_audit_event_lifecycle() {
1223 let repo = test_repo().await;
1224 repo.create_project("kgp_test").await.unwrap();
1225
1226 repo.create_audit_event(
1227 "kae_1",
1228 Some("kgp_test"),
1229 Some("kgm_alice"),
1230 Some("kgt_123"),
1231 "push",
1232 Some("kgr_1"),
1233 Some("127.0.0.1"),
1234 Some("{\"revision\":1}"),
1235 )
1236 .await
1237 .unwrap();
1238
1239 let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1240 assert_eq!(events.len(), 1);
1241 let (
1242 event_id,
1243 _created_at,
1244 project_id,
1245 actor_member_id,
1246 actor_token_id,
1247 event_type,
1248 request_id,
1249 remote_addr,
1250 metadata_json,
1251 ) = &events[0];
1252 assert_eq!(event_id, "kae_1");
1253 assert_eq!(project_id.as_deref(), Some("kgp_test"));
1254 assert_eq!(actor_member_id.as_deref(), Some("kgm_alice"));
1255 assert_eq!(actor_token_id.as_deref(), Some("kgt_123"));
1256 assert_eq!(event_type, "push");
1257 assert_eq!(request_id.as_deref(), Some("kgr_1"));
1258 assert_eq!(remote_addr.as_deref(), Some("127.0.0.1"));
1259 assert_eq!(metadata_json.as_deref(), Some("{\"revision\":1}"));
1260 }
1261
1262 #[tokio::test]
1263 async fn test_audit_event_does_not_leak_sensitive_data() {
1264 let repo = test_repo().await;
1265 repo.create_project("kgp_test").await.unwrap();
1266
1267 repo.create_audit_event(
1268 "kae_1",
1269 Some("kgp_test"),
1270 None,
1271 None,
1272 "project_request_created",
1273 Some("kgr_1"),
1274 Some("127.0.0.1"),
1275 Some("{\"requester_name\":\"Alice\"}"),
1276 )
1277 .await
1278 .unwrap();
1279
1280 let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1281 assert_eq!(events.len(), 1);
1282 let metadata_json = &events[0].8;
1283 let meta = metadata_json.as_deref().unwrap_or("");
1284 assert!(!meta.contains("secret"));
1285 assert!(!meta.contains("token"));
1286 assert!(!meta.contains("claim_secret"));
1287 }
1288
1289 #[tokio::test]
1290 async fn test_request_id_seen() {
1291 let repo = test_repo().await;
1292 repo.create_project("kgp_test").await.unwrap();
1293
1294 let request_id = "kgr_test";
1295 assert!(
1296 !repo
1297 .request_id_seen("kgp_test", request_id, "push")
1298 .await
1299 .unwrap()
1300 );
1301
1302 repo.create_audit_event(
1303 "kae_1",
1304 Some("kgp_test"),
1305 Some("kgm_alice"),
1306 Some("kgt_123"),
1307 "push",
1308 Some(request_id),
1309 Some("127.0.0.1"),
1310 Some("{}"),
1311 )
1312 .await
1313 .unwrap();
1314
1315 assert!(
1316 repo.request_id_seen("kgp_test", request_id, "push")
1317 .await
1318 .unwrap()
1319 );
1320 assert!(
1321 !repo
1322 .request_id_seen("kgp_test", request_id, "join_request")
1323 .await
1324 .unwrap()
1325 );
1326 }
1327
1328 #[tokio::test]
1329 async fn test_metrics() {
1330 let repo = test_repo().await;
1331 repo.create_project("kgp_test").await.unwrap();
1332 let (projects, tokens, admins, db_size) = repo.get_metrics().await.unwrap();
1333 assert_eq!(projects, 1);
1334 assert_eq!(tokens, 0);
1335 assert_eq!(admins, 0);
1336 assert!(db_size > 0);
1337 }
1338}