1use kagi_sync::domain::project_state::ProjectFile;
2use sha2::Digest;
3use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
4use sqlx::{Pool, Row, Sqlite};
5use std::path::Path;
6use std::time::Duration;
7
8pub struct SqliteRemoteRepository {
9 pool: Pool<Sqlite>,
10}
11
12pub struct PushProjectStateRequest<'a> {
13 pub project_id: &'a str,
14 pub base_revision: i64,
15 pub kagi_json: &'a str,
16 pub access_json: &'a str,
17 pub files: &'a [ProjectFile],
18 pub activate_tokens: &'a [String],
19 pub revoke_tokens: &'a [String],
20 pub accepted_joins: &'a [String],
21 pub manifest_json: Option<&'a str>,
22 pub manifest_signature: Option<&'a str>,
23}
24
25pub struct CreateProjectMemberRequest<'a> {
26 pub project_id: &'a str,
27 pub member_id: &'a str,
28 pub name: &'a str,
29 pub role: &'a str,
30 pub status: &'a str,
31 pub recipient: &'a str,
32 pub claim_secret_hash: &'a str,
33}
34
35pub struct UpsertJoinRequest<'a> {
36 pub project_id: &'a str,
37 pub member_id: &'a str,
38 pub request_token_id: &'a str,
39 pub name: &'a str,
40 pub normalized_name: &'a str,
41 pub recipient: &'a str,
42 pub signing_public_key: &'a str,
43}
44
45pub struct ApproveProjectRequest<'a> {
46 pub project_id: &'a str,
47 pub requester_member_id: &'a str,
48 pub requester_name: &'a str,
49 pub requester_recipient: &'a str,
50 pub claim_secret_hash: &'a str,
51 pub token_id: &'a str,
52 pub token_hash: &'a str,
53 pub caps_json: &'a str,
54 pub wrapped_b64: &'a str,
55}
56
57impl SqliteRemoteRepository {
58 pub async fn new_file(path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
59 let opts = SqliteConnectOptions::new()
60 .filename(path.as_ref())
61 .create_if_missing(true);
62 Self::connect_with(opts).await
63 }
64
65 async fn connect_with(opts: SqliteConnectOptions) -> Result<Self, sqlx::Error> {
66 let pool = SqlitePoolOptions::new()
67 .max_connections(5)
68 .min_connections(1)
69 .acquire_timeout(Duration::from_secs(5))
70 .connect_with(opts)
71 .await?;
72
73 sqlx::query("PRAGMA foreign_keys = ON;")
74 .execute(&pool)
75 .await?;
76 sqlx::query("PRAGMA journal_mode = WAL;")
77 .execute(&pool)
78 .await?;
79 sqlx::query("PRAGMA synchronous = FULL;")
80 .execute(&pool)
81 .await?;
82 sqlx::query("PRAGMA busy_timeout = 5000;")
83 .execute(&pool)
84 .await?;
85
86 sqlx::migrate!("./migrations").run(&pool).await?;
87
88 Ok(Self { pool })
89 }
90
91 pub async fn create_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
92 let now = time::OffsetDateTime::now_utc().to_string();
93 sqlx::query(
94 "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
95 )
96 .bind(project_id)
97 .bind(&now)
98 .bind(&now)
99 .execute(&self.pool)
100 .await?;
101 Ok(())
102 }
103
104 pub async fn create_token(
105 &self,
106 project_id: &str,
107 token_id: &str,
108 token_hash: &str,
109 capabilities_json: &str,
110 member_id: Option<&str>,
111 status: &str,
112 ) -> Result<(), sqlx::Error> {
113 let now = time::OffsetDateTime::now_utc().to_string();
114 sqlx::query(
115 "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at)
116 VALUES (?, ?, ?, ?, ?, ?, ?)"
117 )
118 .bind(project_id)
119 .bind(token_id)
120 .bind(token_hash)
121 .bind(capabilities_json)
122 .bind(member_id)
123 .bind(status)
124 .bind(&now)
125 .execute(&self.pool)
126 .await?;
127 Ok(())
128 }
129
130 pub async fn authenticate_token(
131 &self,
132 project_id: &str,
133 token_hash: &str,
134 ) -> Result<Option<(String, Vec<String>, Option<String>)>, sqlx::Error> {
135 let row = sqlx::query(
136 "SELECT token_id, capabilities_json, member_id FROM project_tokens
137 WHERE project_id = ? AND token_hash = ? AND status = 'active'",
138 )
139 .bind(project_id)
140 .bind(token_hash)
141 .fetch_optional(&self.pool)
142 .await?;
143
144 Ok(row.map(|r| {
145 let token_id: String = r.try_get("token_id").unwrap_or_default();
146 let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
147 let member_id: Option<String> = r.try_get("member_id").ok();
148 let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
149 (token_id, caps, member_id)
150 }))
151 }
152
153 pub async fn push_project_state(
154 &self,
155 request: PushProjectStateRequest<'_>,
156 ) -> Result<i64, sqlx::Error> {
157 let mut tx = self.pool.begin().await?;
158
159 let current_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
160 .bind(request.project_id)
161 .fetch_optional(&mut *tx)
162 .await?;
163 let current_revision = current_row
164 .map(|r| r.try_get::<i64, _>("revision").unwrap_or(0))
165 .unwrap_or(0);
166 if current_revision != request.base_revision {
167 return Err(sqlx::Error::RowNotFound);
168 }
169
170 sqlx::query("DELETE FROM project_files WHERE project_id = ?")
171 .bind(request.project_id)
172 .execute(&mut *tx)
173 .await?;
174
175 let now = time::OffsetDateTime::now_utc().to_string();
176 for file in request.files {
177 sqlx::query(
178 "INSERT INTO project_files (project_id, path, content, sha256, updated_at) VALUES (?, ?, ?, ?, ?)"
179 )
180 .bind(request.project_id)
181 .bind(&file.path)
182 .bind(&file.content)
183 .bind(&file.sha256)
184 .bind(&now)
185 .execute(&mut *tx)
186 .await?;
187 }
188
189 let new_revision = request.base_revision + 1;
190 sqlx::query(
191 "UPDATE projects SET revision = ?, kagi_json = ?, access_json = ?, updated_at = ? WHERE project_id = ?"
192 )
193 .bind(new_revision)
194 .bind(request.kagi_json)
195 .bind(request.access_json)
196 .bind(&now)
197 .bind(request.project_id)
198 .execute(&mut *tx)
199 .await?;
200
201 for token_id in request.activate_tokens {
202 sqlx::query(
203 "UPDATE project_tokens SET status = 'active', activated_at = ? WHERE project_id = ? AND token_id = ? AND status = 'pending_activation'"
204 )
205 .bind(&now)
206 .bind(request.project_id)
207 .bind(token_id)
208 .execute(&mut *tx)
209 .await?;
210 }
211
212 for token_id in request.revoke_tokens {
213 sqlx::query(
214 "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
215 )
216 .bind(&now)
217 .bind(request.project_id)
218 .bind(token_id)
219 .execute(&mut *tx)
220 .await?;
221 }
222
223 for member_id in request.accepted_joins {
224 sqlx::query(
225 "UPDATE join_requests SET status = 'accepted', updated_at = ? WHERE project_id = ? AND member_id = ?"
226 )
227 .bind(&now)
228 .bind(request.project_id)
229 .bind(member_id)
230 .execute(&mut *tx)
231 .await?;
232 }
233
234 if let Some(manifest_json) = request.manifest_json {
235 let manifest_hash = {
236 let mut hasher = sha2::Sha256::new();
237 hasher.update(manifest_json.as_bytes());
238 hex::encode(hasher.finalize())
239 };
240 sqlx::query(
241 "INSERT INTO manifests (project_id, revision, manifest_hash, manifest_json, manifest_signature, created_at) VALUES (?, ?, ?, ?, ?, ?)"
242 )
243 .bind(request.project_id)
244 .bind(new_revision)
245 .bind(&manifest_hash)
246 .bind(manifest_json)
247 .bind(request.manifest_signature)
248 .bind(&now)
249 .execute(&mut *tx)
250 .await?;
251 }
252
253 tx.commit().await?;
254 Ok(new_revision)
255 }
256
257 pub async fn pull_project_state(
258 &self,
259 project_id: &str,
260 ) -> Result<Option<(i64, Vec<ProjectFile>)>, sqlx::Error> {
261 let revision_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
262 .bind(project_id)
263 .fetch_optional(&self.pool)
264 .await?;
265 let revision = match revision_row {
266 Some(r) => r.try_get::<i64, _>("revision")?,
267 None => return Ok(None),
268 };
269
270 let file_rows =
271 sqlx::query("SELECT path, content, sha256 FROM project_files WHERE project_id = ?")
272 .bind(project_id)
273 .fetch_all(&self.pool)
274 .await?;
275
276 let project_files = file_rows
277 .into_iter()
278 .map(|r| ProjectFile {
279 path: r.try_get("path").unwrap_or_default(),
280 content: r.try_get("content").unwrap_or_default(),
281 sha256: r.try_get("sha256").ok(),
282 })
283 .collect();
284
285 Ok(Some((revision, project_files)))
286 }
287
288 pub async fn get_manifest(
289 &self,
290 project_id: &str,
291 revision: i64,
292 ) -> Result<Option<(String, String, Option<String>)>, sqlx::Error> {
293 let row = sqlx::query(
294 "SELECT manifest_hash, manifest_json, manifest_signature FROM manifests WHERE project_id = ? AND revision = ?",
295 )
296 .bind(project_id)
297 .bind(revision)
298 .fetch_optional(&self.pool)
299 .await?;
300
301 Ok(row.map(|r| {
302 (
303 r.try_get("manifest_hash").unwrap_or_default(),
304 r.try_get("manifest_json").unwrap_or_default(),
305 r.try_get("manifest_signature").ok(),
306 )
307 }))
308 }
309
310 pub async fn list_join_requests(
311 &self,
312 project_id: &str,
313 ) -> Result<Vec<(String, String, String, Option<String>, String)>, sqlx::Error> {
314 let rows = sqlx::query(
315 "SELECT member_id, name, recipient, signing_public_key, created_at FROM join_requests
316 WHERE project_id = ? AND status = 'pending'",
317 )
318 .bind(project_id)
319 .fetch_all(&self.pool)
320 .await?;
321
322 Ok(rows
323 .into_iter()
324 .map(|r| {
325 (
326 r.try_get("member_id").unwrap_or_default(),
327 r.try_get("name").unwrap_or_default(),
328 r.try_get("recipient").unwrap_or_default(),
329 r.try_get("signing_public_key").ok(),
330 r.try_get("created_at").unwrap_or_default(),
331 )
332 })
333 .collect())
334 }
335
336 pub async fn upsert_join_request(
337 &self,
338 request: UpsertJoinRequest<'_>,
339 ) -> Result<(), sqlx::Error> {
340 let now = time::OffsetDateTime::now_utc().to_string();
341 sqlx::query(
342 "INSERT INTO join_requests (project_id, member_id, request_token_id, name, normalized_name, recipient, signing_public_key, status, created_at, updated_at)
343 VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)
344 ON CONFLICT(project_id, member_id) DO UPDATE SET
345 request_token_id = excluded.request_token_id,
346 name = excluded.name,
347 normalized_name = excluded.normalized_name,
348 recipient = excluded.recipient,
349 signing_public_key = excluded.signing_public_key,
350 updated_at = excluded.updated_at
351 WHERE join_requests.request_token_id = excluded.request_token_id"
352 )
353 .bind(request.project_id)
354 .bind(request.member_id)
355 .bind(request.request_token_id)
356 .bind(request.name)
357 .bind(request.normalized_name)
358 .bind(request.recipient)
359 .bind(request.signing_public_key)
360 .bind(&now)
361 .bind(&now)
362 .execute(&self.pool)
363 .await?;
364 Ok(())
365 }
366
367 pub async fn revoke_tokens(
368 &self,
369 project_id: &str,
370 token_ids: &[String],
371 ) -> Result<(), sqlx::Error> {
372 let now = time::OffsetDateTime::now_utc().to_string();
373 for token_id in token_ids {
374 sqlx::query(
375 "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
376 )
377 .bind(&now)
378 .bind(project_id)
379 .bind(token_id)
380 .execute(&self.pool)
381 .await?;
382 }
383 Ok(())
384 }
385
386 pub async fn list_project_tokens(
387 &self,
388 project_id: &str,
389 ) -> Result<
390 Vec<(
391 String,
392 String,
393 Option<String>,
394 String,
395 String,
396 Option<String>,
397 Option<String>,
398 )>,
399 sqlx::Error,
400 > {
401 let rows = sqlx::query(
402 "SELECT token_id, capabilities_json, member_id, status, created_at, activated_at, revoked_at FROM project_tokens WHERE project_id = ? ORDER BY created_at DESC"
403 )
404 .bind(project_id)
405 .fetch_all(&self.pool)
406 .await?;
407
408 Ok(rows
409 .into_iter()
410 .map(|r| {
411 (
412 r.try_get("token_id").unwrap_or_default(),
413 r.try_get("capabilities_json").unwrap_or_default(),
414 r.try_get("member_id").ok(),
415 r.try_get("status").unwrap_or_default(),
416 r.try_get("created_at").unwrap_or_default(),
417 r.try_get("activated_at").ok(),
418 r.try_get("revoked_at").ok(),
419 )
420 })
421 .collect())
422 }
423
424 pub async fn get_project_meta(
425 &self,
426 project_id: &str,
427 ) -> Result<Option<(Option<String>, Option<String>)>, sqlx::Error> {
428 let row = sqlx::query("SELECT kagi_json, access_json FROM projects WHERE project_id = ?")
429 .bind(project_id)
430 .fetch_optional(&self.pool)
431 .await?;
432
433 match row {
434 Some(r) => {
435 let k = r.try_get::<Option<String>, _>("kagi_json")?;
436 let a = r.try_get::<Option<String>, _>("access_json")?;
437 Ok(Some((k, a)))
438 }
439 None => Ok(None),
440 }
441 }
442
443 pub async fn has_admin_token(&self) -> Result<bool, sqlx::Error> {
444 let row = sqlx::query("SELECT COUNT(*) as cnt FROM admin_tokens WHERE status = 'active'")
445 .fetch_one(&self.pool)
446 .await?;
447 let count: i64 = row.try_get("cnt").unwrap_or(0);
448 Ok(count > 0)
449 }
450
451 pub async fn create_admin_token(
452 &self,
453 token_id: &str,
454 token_hash: &str,
455 capabilities_json: &str,
456 created_at: &str,
457 ) -> Result<(), sqlx::Error> {
458 sqlx::query(
459 "INSERT INTO admin_tokens (token_id, token_hash, capabilities_json, status, created_at) VALUES (?, ?, ?, 'active', ?)"
460 )
461 .bind(token_id)
462 .bind(token_hash)
463 .bind(capabilities_json)
464 .bind(created_at)
465 .execute(&self.pool)
466 .await?;
467 Ok(())
468 }
469
470 pub async fn authenticate_admin_token(
471 &self,
472 token_hash: &str,
473 ) -> Result<Option<(String, Vec<String>)>, sqlx::Error> {
474 let row = sqlx::query(
475 "SELECT token_id, capabilities_json FROM admin_tokens WHERE token_hash = ? AND status = 'active'"
476 )
477 .bind(token_hash)
478 .fetch_optional(&self.pool)
479 .await?;
480
481 Ok(row.map(|r| {
482 let token_id: String = r.try_get("token_id").unwrap_or_default();
483 let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
484 let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
485 (token_id, caps)
486 }))
487 }
488
489 pub async fn create_project_request(
490 &self,
491 project_id: &str,
492 requester_member_id: &str,
493 requester_name: &str,
494 requester_recipient: &str,
495 claim_secret_hash: &str,
496 kagi_json: Option<&str>,
497 ) -> Result<(), sqlx::Error> {
498 let now = time::OffsetDateTime::now_utc().to_string();
499 sqlx::query(
500 "INSERT INTO project_requests (project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?)"
501 )
502 .bind(project_id)
503 .bind(requester_member_id)
504 .bind(requester_name)
505 .bind(requester_recipient)
506 .bind(claim_secret_hash)
507 .bind(kagi_json)
508 .bind(&now)
509 .bind(&now)
510 .execute(&self.pool)
511 .await?;
512 Ok(())
513 }
514
515 pub async fn list_project_requests(
516 &self,
517 ) -> Result<
518 Vec<(
519 String,
520 String,
521 String,
522 String,
523 String,
524 Option<String>,
525 String,
526 )>,
527 sqlx::Error,
528 > {
529 let rows = sqlx::query(
530 "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE status = 'pending'"
531 )
532 .fetch_all(&self.pool)
533 .await?;
534
535 Ok(rows
536 .into_iter()
537 .map(|r| {
538 (
539 r.try_get("project_id").unwrap_or_default(),
540 r.try_get("requester_member_id").unwrap_or_default(),
541 r.try_get("requester_name").unwrap_or_default(),
542 r.try_get("requester_recipient").unwrap_or_default(),
543 r.try_get("claim_secret_hash").unwrap_or_default(),
544 r.try_get("kagi_json").ok(),
545 r.try_get("status").unwrap_or_default(),
546 )
547 })
548 .collect())
549 }
550
551 pub async fn get_project_request(
552 &self,
553 project_id: &str,
554 ) -> Result<
555 Option<(
556 String,
557 String,
558 String,
559 String,
560 String,
561 Option<String>,
562 String,
563 )>,
564 sqlx::Error,
565 > {
566 let row = sqlx::query(
567 "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE project_id = ?"
568 )
569 .bind(project_id)
570 .fetch_optional(&self.pool)
571 .await?;
572
573 Ok(row.map(|r| {
574 (
575 r.try_get("project_id").unwrap_or_default(),
576 r.try_get("requester_member_id").unwrap_or_default(),
577 r.try_get("requester_name").unwrap_or_default(),
578 r.try_get("requester_recipient").unwrap_or_default(),
579 r.try_get("claim_secret_hash").unwrap_or_default(),
580 r.try_get("kagi_json").ok(),
581 r.try_get("status").unwrap_or_default(),
582 )
583 }))
584 }
585
586 #[allow(dead_code)]
587 pub async fn delete_project_request(&self, project_id: &str) -> Result<(), sqlx::Error> {
588 sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
589 .bind(project_id)
590 .execute(&self.pool)
591 .await?;
592 Ok(())
593 }
594
595 #[allow(dead_code)]
596 pub async fn create_project_member(
597 &self,
598 req: CreateProjectMemberRequest<'_>,
599 ) -> Result<(), sqlx::Error> {
600 let now = time::OffsetDateTime::now_utc().to_string();
601 sqlx::query(
602 "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
603 )
604 .bind(req.project_id)
605 .bind(req.member_id)
606 .bind(req.name)
607 .bind(req.role)
608 .bind(req.status)
609 .bind(req.recipient)
610 .bind(req.claim_secret_hash)
611 .bind(&now)
612 .bind(&now)
613 .execute(&self.pool)
614 .await?;
615 Ok(())
616 }
617
618 pub async fn get_project_member(
619 &self,
620 project_id: &str,
621 member_id: &str,
622 ) -> Result<Option<(String, String, String, String, String)>, sqlx::Error> {
623 let row = sqlx::query(
624 "SELECT name, role, status, recipient, claim_secret_hash FROM project_members WHERE project_id = ? AND member_id = ?"
625 )
626 .bind(project_id)
627 .bind(member_id)
628 .fetch_optional(&self.pool)
629 .await?;
630 Ok(row.map(|r| {
631 let name: String = r.try_get("name").unwrap_or_default();
632 let role: String = r.try_get("role").unwrap_or_default();
633 let status: String = r.try_get("status").unwrap_or_default();
634 let recipient: String = r.try_get("recipient").unwrap_or_default();
635 let claim_secret_hash: String = r.try_get("claim_secret_hash").unwrap_or_default();
636 (name, role, status, recipient, claim_secret_hash)
637 }))
638 }
639
640 #[allow(dead_code)]
641 pub async fn save_wrapped_project_token(
642 &self,
643 project_id: &str,
644 member_id: &str,
645 wrapped: &str,
646 ) -> Result<(), sqlx::Error> {
647 sqlx::query(
648 "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
649 )
650 .bind(wrapped)
651 .bind(project_id)
652 .bind(member_id)
653 .execute(&self.pool)
654 .await?;
655 Ok(())
656 }
657
658 pub async fn get_wrapped_project_token(
659 &self,
660 project_id: &str,
661 member_id: &str,
662 ) -> Result<Option<String>, sqlx::Error> {
663 let row = sqlx::query(
664 "SELECT wrapped_project_token FROM project_members WHERE project_id = ? AND member_id = ?"
665 )
666 .bind(project_id)
667 .bind(member_id)
668 .fetch_optional(&self.pool)
669 .await?;
670 Ok(row.and_then(|r| r.try_get("wrapped_project_token").ok()))
671 }
672
673 pub async fn get_project_member_role(
674 &self,
675 project_id: &str,
676 member_id: &str,
677 ) -> Result<Option<String>, sqlx::Error> {
678 let row =
679 sqlx::query("SELECT role FROM project_members WHERE project_id = ? AND member_id = ?")
680 .bind(project_id)
681 .bind(member_id)
682 .fetch_optional(&self.pool)
683 .await?;
684
685 Ok(row.map(|r| r.try_get("role").unwrap_or_default()))
686 }
687
688 pub async fn approve_project_request_tx(
689 &self,
690 req: ApproveProjectRequest<'_>,
691 ) -> Result<(), sqlx::Error> {
692 let mut tx = self.pool.begin().await?;
693 let now = time::OffsetDateTime::now_utc().to_string();
694
695 sqlx::query(
696 "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
697 )
698 .bind(req.project_id)
699 .bind(&now)
700 .bind(&now)
701 .execute(&mut *tx)
702 .await?;
703
704 sqlx::query(
705 "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
706 )
707 .bind(req.project_id)
708 .bind(req.requester_member_id)
709 .bind(req.requester_name)
710 .bind("admin")
711 .bind("active")
712 .bind(req.requester_recipient)
713 .bind(req.claim_secret_hash)
714 .bind(&now)
715 .bind(&now)
716 .execute(&mut *tx)
717 .await?;
718
719 sqlx::query(
720 "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at) VALUES (?, ?, ?, ?, ?, 'active', ?)"
721 )
722 .bind(req.project_id)
723 .bind(req.token_id)
724 .bind(req.token_hash)
725 .bind(req.caps_json)
726 .bind(req.requester_member_id)
727 .bind(&now)
728 .execute(&mut *tx)
729 .await?;
730
731 sqlx::query(
732 "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
733 )
734 .bind(req.wrapped_b64)
735 .bind(req.project_id)
736 .bind(req.requester_member_id)
737 .execute(&mut *tx)
738 .await?;
739
740 sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
741 .bind(req.project_id)
742 .execute(&mut *tx)
743 .await?;
744
745 tx.commit().await?;
746 Ok(())
747 }
748
749 pub async fn delete_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
750 sqlx::query("DELETE FROM projects WHERE project_id = ?")
751 .bind(project_id)
752 .execute(&self.pool)
753 .await?;
754 Ok(())
755 }
756
757 pub async fn list_projects(
758 &self,
759 ) -> Result<Vec<(String, i64, Option<String>, String)>, sqlx::Error> {
760 let rows = sqlx::query("SELECT project_id, revision, kagi_json, created_at FROM projects")
761 .fetch_all(&self.pool)
762 .await?;
763
764 Ok(rows
765 .into_iter()
766 .map(|r| {
767 (
768 r.try_get("project_id").unwrap_or_default(),
769 r.try_get("revision").unwrap_or_default(),
770 r.try_get("kagi_json").ok(),
771 r.try_get("created_at").unwrap_or_default(),
772 )
773 })
774 .collect())
775 }
776
777 #[allow(clippy::too_many_arguments)]
778 pub async fn create_audit_event(
779 &self,
780 event_id: &str,
781 project_id: Option<&str>,
782 actor_member_id: Option<&str>,
783 actor_token_id: Option<&str>,
784 event_type: &str,
785 request_id: Option<&str>,
786 remote_addr: Option<&str>,
787 metadata_json: Option<&str>,
788 ) -> Result<(), sqlx::Error> {
789 let now = time::OffsetDateTime::now_utc().to_string();
790 sqlx::query(
791 "INSERT INTO audit_events (event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
792 )
793 .bind(event_id)
794 .bind(&now)
795 .bind(project_id)
796 .bind(actor_member_id)
797 .bind(actor_token_id)
798 .bind(event_type)
799 .bind(request_id)
800 .bind(remote_addr)
801 .bind(metadata_json)
802 .execute(&self.pool)
803 .await?;
804 Ok(())
805 }
806
807 #[allow(dead_code)]
808 pub async fn list_audit_events(
809 &self,
810 project_id: Option<&str>,
811 limit: i64,
812 ) -> Result<
813 Vec<(
814 String,
815 String,
816 Option<String>,
817 Option<String>,
818 Option<String>,
819 String,
820 Option<String>,
821 Option<String>,
822 Option<String>,
823 )>,
824 sqlx::Error,
825 > {
826 let rows = if let Some(pid) = project_id {
827 sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events WHERE project_id = ? ORDER BY created_at DESC LIMIT ?")
828 .bind(pid)
829 .bind(limit)
830 .fetch_all(&self.pool)
831 .await?
832 } else {
833 sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events ORDER BY created_at DESC LIMIT ?")
834 .bind(limit)
835 .fetch_all(&self.pool)
836 .await?
837 };
838
839 Ok(rows
840 .into_iter()
841 .map(|r| {
842 (
843 r.try_get("event_id").unwrap_or_default(),
844 r.try_get("created_at").unwrap_or_default(),
845 r.try_get("project_id").ok(),
846 r.try_get("actor_member_id").ok(),
847 r.try_get("actor_token_id").ok(),
848 r.try_get("event_type").unwrap_or_default(),
849 r.try_get("request_id").ok(),
850 r.try_get("remote_addr").ok(),
851 r.try_get("metadata_json").ok(),
852 )
853 })
854 .collect())
855 }
856
857 pub async fn get_metrics(&self) -> Result<(i64, i64, i64, i64), sqlx::Error> {
858 let projects: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM projects")
859 .fetch_one(&self.pool)
860 .await?;
861 let tokens: i64 =
862 sqlx::query_scalar("SELECT COUNT(*) FROM project_tokens WHERE status != 'revoked'")
863 .fetch_one(&self.pool)
864 .await?;
865 let admins: i64 =
866 sqlx::query_scalar("SELECT COUNT(*) FROM admin_tokens WHERE status = 'active'")
867 .fetch_one(&self.pool)
868 .await?;
869 let db_size: i64 = sqlx::query_scalar(
870 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
871 )
872 .fetch_one(&self.pool)
873 .await?;
874 Ok((projects, tokens, admins, db_size))
875 }
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881 use kagi_sync::domain::project_state::ProjectFile;
882
883 async fn test_repo() -> SqliteRemoteRepository {
884 let id = rand::random::<u64>();
885 let path = std::env::temp_dir().join(format!("kagi_test_{}.db", id));
886 SqliteRemoteRepository::new_file(path).await.unwrap()
887 }
888
889 #[tokio::test]
890 async fn test_create_project_and_pull() {
891 let repo = test_repo().await;
892 repo.create_project("kgp_test").await.unwrap();
893
894 let result = repo.pull_project_state("kgp_test").await.unwrap();
895 assert!(result.is_some());
896 let (revision, files) = result.unwrap();
897 assert_eq!(revision, 0);
898 assert!(files.is_empty());
899 }
900
901 #[tokio::test]
902 async fn test_create_project_duplicate_fails() {
903 let repo = test_repo().await;
904 repo.create_project("kgp_test").await.unwrap();
905 let err = repo.create_project("kgp_test").await.unwrap_err();
906 assert!(
907 err.as_database_error()
908 .map(|d| d.is_unique_violation())
909 .unwrap_or(false)
910 );
911 }
912
913 #[tokio::test]
914 async fn test_authenticate_token() {
915 let repo = test_repo().await;
916 repo.create_project("kgp_test").await.unwrap();
917 repo.create_token(
918 "kgp_test",
919 "kgt_123",
920 "hash_correct",
921 "[\"pull\"]",
922 Some("kgm_alice"),
923 "active",
924 )
925 .await
926 .unwrap();
927
928 let found = repo
929 .authenticate_token("kgp_test", "hash_correct")
930 .await
931 .unwrap();
932 assert!(found.is_some());
933 let (token_id, caps, member_id) = found.unwrap();
934 assert_eq!(token_id, "kgt_123");
935 assert_eq!(caps, vec!["pull"]);
936 assert_eq!(member_id, Some("kgm_alice".to_string()));
937
938 let not_found = repo
939 .authenticate_token("kgp_test", "hash_wrong")
940 .await
941 .unwrap();
942 assert!(not_found.is_none());
943 }
944
945 #[tokio::test]
946 async fn test_push_and_pull_project_state() {
947 let repo = test_repo().await;
948 repo.create_project("kgp_test").await.unwrap();
949
950 let files = vec![ProjectFile {
951 path: "dev.env".into(),
952 content: "KEY=val".into(),
953 sha256: Some("abc".into()),
954 }];
955 let new_rev = repo
956 .push_project_state(PushProjectStateRequest {
957 project_id: "kgp_test",
958 base_revision: 0,
959 kagi_json: "{}",
960 access_json: "{}",
961 files: &files,
962 activate_tokens: &[],
963 revoke_tokens: &[],
964 accepted_joins: &[],
965 manifest_json: None,
966 manifest_signature: None,
967 })
968 .await
969 .unwrap();
970 assert_eq!(new_rev, 1);
971
972 let result = repo.pull_project_state("kgp_test").await.unwrap();
973 let (revision, pulled_files) = result.unwrap();
974 assert_eq!(revision, 1);
975 assert_eq!(pulled_files.len(), 1);
976 assert_eq!(pulled_files[0].path, "dev.env");
977 assert_eq!(pulled_files[0].content, "KEY=val");
978 assert_eq!(pulled_files[0].sha256, Some("abc".to_string()));
979 }
980
981 #[tokio::test]
982 async fn test_push_conflict() {
983 let repo = test_repo().await;
984 repo.create_project("kgp_test").await.unwrap();
985
986 let err = repo
987 .push_project_state(PushProjectStateRequest {
988 project_id: "kgp_test",
989 base_revision: 99,
990 kagi_json: "{}",
991 access_json: "{}",
992 files: &[],
993 activate_tokens: &[],
994 revoke_tokens: &[],
995 accepted_joins: &[],
996 manifest_json: None,
997 manifest_signature: None,
998 })
999 .await
1000 .unwrap_err();
1001 assert!(matches!(err, sqlx::Error::RowNotFound));
1002 }
1003
1004 #[tokio::test]
1005 async fn test_join_request_flow() {
1006 let repo = test_repo().await;
1007 repo.create_project("kgp_test").await.unwrap();
1008
1009 repo.upsert_join_request(UpsertJoinRequest {
1010 project_id: "kgp_test",
1011 member_id: "kgm_bob",
1012 request_token_id: "kgt_req1",
1013 name: "Bob",
1014 normalized_name: "bob",
1015 recipient: "age1...",
1016 signing_public_key: "signing-key",
1017 })
1018 .await
1019 .unwrap();
1020
1021 let pending = repo.list_join_requests("kgp_test").await.unwrap();
1022 assert_eq!(pending.len(), 1);
1023 assert_eq!(pending[0].0, "kgm_bob");
1024 assert_eq!(pending[0].1, "Bob");
1025 assert_eq!(pending[0].2, "age1...");
1026 assert_eq!(pending[0].3.as_deref(), Some("signing-key"));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_revoke_token() {
1031 let repo = test_repo().await;
1032 repo.create_project("kgp_test").await.unwrap();
1033 repo.create_token(
1034 "kgp_test",
1035 "kgt_123",
1036 "hash_value",
1037 "[\"pull\"]",
1038 None,
1039 "active",
1040 )
1041 .await
1042 .unwrap();
1043
1044 repo.revoke_tokens("kgp_test", &["kgt_123".into()])
1045 .await
1046 .unwrap();
1047
1048 let found = repo
1049 .authenticate_token("kgp_test", "hash_value")
1050 .await
1051 .unwrap();
1052 assert!(found.is_none());
1053 }
1054
1055 #[tokio::test]
1056 async fn test_get_project_meta() {
1057 let repo = test_repo().await;
1058 repo.create_project("kgp_test").await.unwrap();
1059
1060 let files = vec![ProjectFile {
1061 path: "a".into(),
1062 content: "b".into(),
1063 sha256: None,
1064 }];
1065 repo.push_project_state(PushProjectStateRequest {
1066 project_id: "kgp_test",
1067 base_revision: 0,
1068 kagi_json: "{\"k\":1}",
1069 access_json: "{\"a\":2}",
1070 files: &files,
1071 activate_tokens: &[],
1072 revoke_tokens: &[],
1073 accepted_joins: &[],
1074 manifest_json: None,
1075 manifest_signature: None,
1076 })
1077 .await
1078 .unwrap();
1079
1080 let meta = repo.get_project_meta("kgp_test").await.unwrap();
1081 assert!(meta.is_some());
1082 let (kagi_json, access_json) = meta.unwrap();
1083 assert_eq!(kagi_json, Some("{\"k\":1}".to_string()));
1084 assert_eq!(access_json, Some("{\"a\":2}".to_string()));
1085 }
1086
1087 #[tokio::test]
1088 async fn test_admin_token_lifecycle() {
1089 let repo = test_repo().await;
1090 assert!(!repo.has_admin_token().await.unwrap());
1091
1092 let created_at = time::OffsetDateTime::now_utc().to_string();
1093 repo.create_admin_token("kat_123", "hash_admin", "[\"admin\"]", &created_at)
1094 .await
1095 .unwrap();
1096
1097 assert!(repo.has_admin_token().await.unwrap());
1098
1099 let found = repo.authenticate_admin_token("hash_admin").await.unwrap();
1100 assert!(found.is_some());
1101 let (token_id, caps) = found.unwrap();
1102 assert_eq!(token_id, "kat_123");
1103 assert_eq!(caps, vec!["admin"]);
1104 }
1105
1106 #[tokio::test]
1107 async fn test_authenticate_admin_token_wrong_hash() {
1108 let repo = test_repo().await;
1109 let created_at = time::OffsetDateTime::now_utc().to_string();
1110 repo.create_admin_token("kat_123", "hash_correct", "[\"admin\"]", &created_at)
1111 .await
1112 .unwrap();
1113
1114 let not_found = repo.authenticate_admin_token("hash_wrong").await.unwrap();
1115 assert!(not_found.is_none());
1116 }
1117
1118 #[tokio::test]
1119 async fn test_project_request_lifecycle() {
1120 let repo = test_repo().await;
1121 repo.create_project_request(
1122 "kgp_req",
1123 "kgm_alice",
1124 "Alice",
1125 "age1...",
1126 "cs:test",
1127 Some("{\"key\":1}"),
1128 )
1129 .await
1130 .unwrap();
1131
1132 let requests = repo.list_project_requests().await.unwrap();
1133 assert_eq!(requests.len(), 1);
1134 let (project_id, member_id, name, recipient, _hash, kagi_json, status) = &requests[0];
1135 assert_eq!(project_id, "kgp_req");
1136 assert_eq!(member_id, "kgm_alice");
1137 assert_eq!(name, "Alice");
1138 assert_eq!(recipient, "age1...");
1139 assert_eq!(kagi_json.as_deref(), Some("{\"key\":1}"));
1140 assert_eq!(status, "pending");
1141
1142 let single = repo.get_project_request("kgp_req").await.unwrap();
1143 assert!(single.is_some());
1144 let (project_id2, member_id2, name2, recipient2, _hash2, kagi_json2, status2) =
1145 single.unwrap();
1146 assert_eq!(project_id2, "kgp_req");
1147 assert_eq!(member_id2, "kgm_alice");
1148 assert_eq!(name2, "Alice");
1149 assert_eq!(recipient2, "age1...");
1150 assert_eq!(kagi_json2.as_deref(), Some("{\"key\":1}"));
1151 assert_eq!(status2, "pending");
1152
1153 repo.delete_project_request("kgp_req").await.unwrap();
1154 let after_delete = repo.list_project_requests().await.unwrap();
1155 assert!(after_delete.is_empty());
1156 }
1157
1158 #[tokio::test]
1159 async fn test_project_member_lifecycle() {
1160 let repo = test_repo().await;
1161 repo.create_project("kgp_test").await.unwrap();
1162
1163 repo.create_project_member(CreateProjectMemberRequest {
1164 project_id: "kgp_test",
1165 member_id: "kgm_bob",
1166 name: "Bob",
1167 role: "admin",
1168 status: "active",
1169 recipient: "age1...",
1170 claim_secret_hash: "cs:test",
1171 })
1172 .await
1173 .unwrap();
1174
1175 let role = repo
1176 .get_project_member_role("kgp_test", "kgm_bob")
1177 .await
1178 .unwrap();
1179 assert_eq!(role, Some("admin".to_string()));
1180
1181 repo.delete_project("kgp_test").await.unwrap();
1182
1183 let role_after = repo
1184 .get_project_member_role("kgp_test", "kgm_bob")
1185 .await
1186 .unwrap();
1187 assert!(role_after.is_none());
1188 }
1189
1190 #[tokio::test]
1191 async fn test_list_projects() {
1192 let repo = test_repo().await;
1193 repo.create_project("kgp_a").await.unwrap();
1194 repo.create_project("kgp_b").await.unwrap();
1195
1196 let projects = repo.list_projects().await.unwrap();
1197 assert_eq!(projects.len(), 2);
1198 let ids: Vec<String> = projects.iter().map(|p| p.0.clone()).collect();
1199 assert!(ids.contains(&"kgp_a".to_string()));
1200 assert!(ids.contains(&"kgp_b".to_string()));
1201 }
1202
1203 #[tokio::test]
1204 async fn test_audit_event_lifecycle() {
1205 let repo = test_repo().await;
1206 repo.create_project("kgp_test").await.unwrap();
1207
1208 repo.create_audit_event(
1209 "kae_1",
1210 Some("kgp_test"),
1211 Some("kgm_alice"),
1212 Some("kgt_123"),
1213 "push",
1214 Some("kgr_1"),
1215 Some("127.0.0.1"),
1216 Some("{\"revision\":1}"),
1217 )
1218 .await
1219 .unwrap();
1220
1221 let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1222 assert_eq!(events.len(), 1);
1223 let (
1224 event_id,
1225 _created_at,
1226 project_id,
1227 actor_member_id,
1228 actor_token_id,
1229 event_type,
1230 request_id,
1231 remote_addr,
1232 metadata_json,
1233 ) = &events[0];
1234 assert_eq!(event_id, "kae_1");
1235 assert_eq!(project_id.as_deref(), Some("kgp_test"));
1236 assert_eq!(actor_member_id.as_deref(), Some("kgm_alice"));
1237 assert_eq!(actor_token_id.as_deref(), Some("kgt_123"));
1238 assert_eq!(event_type, "push");
1239 assert_eq!(request_id.as_deref(), Some("kgr_1"));
1240 assert_eq!(remote_addr.as_deref(), Some("127.0.0.1"));
1241 assert_eq!(metadata_json.as_deref(), Some("{\"revision\":1}"));
1242 }
1243
1244 #[tokio::test]
1245 async fn test_audit_event_does_not_leak_sensitive_data() {
1246 let repo = test_repo().await;
1247 repo.create_project("kgp_test").await.unwrap();
1248
1249 repo.create_audit_event(
1250 "kae_1",
1251 Some("kgp_test"),
1252 None,
1253 None,
1254 "project_request_created",
1255 Some("kgr_1"),
1256 Some("127.0.0.1"),
1257 Some("{\"requester_name\":\"Alice\"}"),
1258 )
1259 .await
1260 .unwrap();
1261
1262 let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1263 assert_eq!(events.len(), 1);
1264 let metadata_json = &events[0].8;
1265 let meta = metadata_json.as_deref().unwrap_or("");
1266 assert!(!meta.contains("secret"));
1267 assert!(!meta.contains("token"));
1268 assert!(!meta.contains("claim_secret"));
1269 }
1270
1271 #[tokio::test]
1272 async fn test_metrics() {
1273 let repo = test_repo().await;
1274 repo.create_project("kgp_test").await.unwrap();
1275 let (projects, tokens, admins, db_size) = repo.get_metrics().await.unwrap();
1276 assert_eq!(projects, 1);
1277 assert_eq!(tokens, 0);
1278 assert_eq!(admins, 0);
1279 assert!(db_size > 0);
1280 }
1281}