Skip to main content

kagi_server/
sqlite_remote.rs

1use kagi_sync::domain::project_state::ProjectFile;
2use sha2::Digest;
3use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
4use sqlx::{Pool, Row, Sqlite};
5use std::path::Path;
6use std::time::Duration;
7
8pub struct SqliteRemoteRepository {
9    pool: Pool<Sqlite>,
10}
11
12pub struct PushProjectStateRequest<'a> {
13    pub project_id: &'a str,
14    pub base_revision: i64,
15    pub kagi_json: &'a str,
16    pub access_json: &'a str,
17    pub files: &'a [ProjectFile],
18    pub activate_tokens: &'a [String],
19    pub revoke_tokens: &'a [String],
20    pub accepted_joins: &'a [String],
21    pub manifest_json: Option<&'a str>,
22    pub manifest_signature: Option<&'a str>,
23}
24
25pub struct CreateProjectMemberRequest<'a> {
26    pub project_id: &'a str,
27    pub member_id: &'a str,
28    pub name: &'a str,
29    pub role: &'a str,
30    pub status: &'a str,
31    pub recipient: &'a str,
32    pub claim_secret_hash: &'a str,
33}
34
35pub struct UpsertJoinRequest<'a> {
36    pub project_id: &'a str,
37    pub member_id: &'a str,
38    pub request_token_id: &'a str,
39    pub name: &'a str,
40    pub normalized_name: &'a str,
41    pub recipient: &'a str,
42    pub signing_public_key: &'a str,
43}
44
45pub struct ApproveProjectRequest<'a> {
46    pub project_id: &'a str,
47    pub requester_member_id: &'a str,
48    pub requester_name: &'a str,
49    pub requester_recipient: &'a str,
50    pub claim_secret_hash: &'a str,
51    pub token_id: &'a str,
52    pub token_hash: &'a str,
53    pub caps_json: &'a str,
54    pub wrapped_b64: &'a str,
55}
56
57impl SqliteRemoteRepository {
58    pub async fn new_file(path: impl AsRef<Path>) -> Result<Self, sqlx::Error> {
59        let opts = SqliteConnectOptions::new()
60            .filename(path.as_ref())
61            .create_if_missing(true);
62        Self::connect_with(opts).await
63    }
64
65    async fn connect_with(opts: SqliteConnectOptions) -> Result<Self, sqlx::Error> {
66        let pool = SqlitePoolOptions::new()
67            .max_connections(5)
68            .min_connections(1)
69            .acquire_timeout(Duration::from_secs(5))
70            .connect_with(opts)
71            .await?;
72
73        sqlx::query("PRAGMA foreign_keys = ON;")
74            .execute(&pool)
75            .await?;
76        sqlx::query("PRAGMA journal_mode = WAL;")
77            .execute(&pool)
78            .await?;
79        sqlx::query("PRAGMA synchronous = FULL;")
80            .execute(&pool)
81            .await?;
82        sqlx::query("PRAGMA busy_timeout = 5000;")
83            .execute(&pool)
84            .await?;
85
86        sqlx::migrate!("./migrations").run(&pool).await?;
87
88        Ok(Self { pool })
89    }
90
91    pub async fn create_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
92        let now = time::OffsetDateTime::now_utc().to_string();
93        sqlx::query(
94            "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
95        )
96        .bind(project_id)
97        .bind(&now)
98        .bind(&now)
99        .execute(&self.pool)
100        .await?;
101        Ok(())
102    }
103
104    pub async fn create_token(
105        &self,
106        project_id: &str,
107        token_id: &str,
108        token_hash: &str,
109        capabilities_json: &str,
110        member_id: Option<&str>,
111        status: &str,
112    ) -> Result<(), sqlx::Error> {
113        let now = time::OffsetDateTime::now_utc().to_string();
114        sqlx::query(
115            "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at)
116            VALUES (?, ?, ?, ?, ?, ?, ?)"
117        )
118        .bind(project_id)
119        .bind(token_id)
120        .bind(token_hash)
121        .bind(capabilities_json)
122        .bind(member_id)
123        .bind(status)
124        .bind(&now)
125        .execute(&self.pool)
126        .await?;
127        Ok(())
128    }
129
130    pub async fn authenticate_token(
131        &self,
132        project_id: &str,
133        token_hash: &str,
134    ) -> Result<Option<(String, Vec<String>, Option<String>)>, sqlx::Error> {
135        let row = sqlx::query(
136            "SELECT token_id, capabilities_json, member_id FROM project_tokens
137             WHERE project_id = ? AND token_hash = ? AND status = 'active'",
138        )
139        .bind(project_id)
140        .bind(token_hash)
141        .fetch_optional(&self.pool)
142        .await?;
143
144        Ok(row.map(|r| {
145            let token_id: String = r.try_get("token_id").unwrap_or_default();
146            let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
147            let member_id: Option<String> = r.try_get("member_id").ok();
148            let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
149            (token_id, caps, member_id)
150        }))
151    }
152
153    pub async fn push_project_state(
154        &self,
155        request: PushProjectStateRequest<'_>,
156    ) -> Result<i64, sqlx::Error> {
157        let mut tx = self.pool.begin().await?;
158
159        let current_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
160            .bind(request.project_id)
161            .fetch_optional(&mut *tx)
162            .await?;
163        let current_revision = current_row
164            .map(|r| r.try_get::<i64, _>("revision").unwrap_or(0))
165            .unwrap_or(0);
166        if current_revision != request.base_revision {
167            return Err(sqlx::Error::RowNotFound);
168        }
169
170        sqlx::query("DELETE FROM project_files WHERE project_id = ?")
171            .bind(request.project_id)
172            .execute(&mut *tx)
173            .await?;
174
175        let now = time::OffsetDateTime::now_utc().to_string();
176        for file in request.files {
177            sqlx::query(
178                "INSERT INTO project_files (project_id, path, content, sha256, updated_at) VALUES (?, ?, ?, ?, ?)"
179            )
180            .bind(request.project_id)
181            .bind(&file.path)
182            .bind(&file.content)
183            .bind(&file.sha256)
184            .bind(&now)
185            .execute(&mut *tx)
186            .await?;
187        }
188
189        let new_revision = request.base_revision + 1;
190        sqlx::query(
191            "UPDATE projects SET revision = ?, kagi_json = ?, access_json = ?, updated_at = ? WHERE project_id = ?"
192        )
193        .bind(new_revision)
194        .bind(request.kagi_json)
195        .bind(request.access_json)
196        .bind(&now)
197        .bind(request.project_id)
198        .execute(&mut *tx)
199        .await?;
200
201        for token_id in request.activate_tokens {
202            sqlx::query(
203                "UPDATE project_tokens SET status = 'active', activated_at = ? WHERE project_id = ? AND token_id = ? AND status = 'pending_activation'"
204            )
205            .bind(&now)
206            .bind(request.project_id)
207            .bind(token_id)
208            .execute(&mut *tx)
209            .await?;
210        }
211
212        for token_id in request.revoke_tokens {
213            sqlx::query(
214                "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
215            )
216            .bind(&now)
217            .bind(request.project_id)
218            .bind(token_id)
219            .execute(&mut *tx)
220            .await?;
221        }
222
223        for member_id in request.accepted_joins {
224            sqlx::query(
225                "UPDATE join_requests SET status = 'accepted', updated_at = ? WHERE project_id = ? AND member_id = ?"
226            )
227            .bind(&now)
228            .bind(request.project_id)
229            .bind(member_id)
230            .execute(&mut *tx)
231            .await?;
232        }
233
234        if let Some(manifest_json) = request.manifest_json {
235            let manifest_hash = {
236                let mut hasher = sha2::Sha256::new();
237                hasher.update(manifest_json.as_bytes());
238                hex::encode(hasher.finalize())
239            };
240            sqlx::query(
241                "INSERT INTO manifests (project_id, revision, manifest_hash, manifest_json, manifest_signature, created_at) VALUES (?, ?, ?, ?, ?, ?)"
242            )
243            .bind(request.project_id)
244            .bind(new_revision)
245            .bind(&manifest_hash)
246            .bind(manifest_json)
247            .bind(request.manifest_signature)
248            .bind(&now)
249            .execute(&mut *tx)
250            .await?;
251        }
252
253        tx.commit().await?;
254        Ok(new_revision)
255    }
256
257    pub async fn pull_project_state(
258        &self,
259        project_id: &str,
260    ) -> Result<Option<(i64, Vec<ProjectFile>)>, sqlx::Error> {
261        let revision_row = sqlx::query("SELECT revision FROM projects WHERE project_id = ?")
262            .bind(project_id)
263            .fetch_optional(&self.pool)
264            .await?;
265        let revision = match revision_row {
266            Some(r) => r.try_get::<i64, _>("revision")?,
267            None => return Ok(None),
268        };
269
270        let file_rows =
271            sqlx::query("SELECT path, content, sha256 FROM project_files WHERE project_id = ?")
272                .bind(project_id)
273                .fetch_all(&self.pool)
274                .await?;
275
276        let project_files = file_rows
277            .into_iter()
278            .map(|r| ProjectFile {
279                path: r.try_get("path").unwrap_or_default(),
280                content: r.try_get("content").unwrap_or_default(),
281                sha256: r.try_get("sha256").ok(),
282            })
283            .collect();
284
285        Ok(Some((revision, project_files)))
286    }
287
288    pub async fn get_manifest(
289        &self,
290        project_id: &str,
291        revision: i64,
292    ) -> Result<Option<(String, String, Option<String>)>, sqlx::Error> {
293        let row = sqlx::query(
294            "SELECT manifest_hash, manifest_json, manifest_signature FROM manifests WHERE project_id = ? AND revision = ?",
295        )
296        .bind(project_id)
297        .bind(revision)
298        .fetch_optional(&self.pool)
299        .await?;
300
301        Ok(row.map(|r| {
302            (
303                r.try_get("manifest_hash").unwrap_or_default(),
304                r.try_get("manifest_json").unwrap_or_default(),
305                r.try_get("manifest_signature").ok(),
306            )
307        }))
308    }
309
310    pub async fn list_join_requests(
311        &self,
312        project_id: &str,
313    ) -> Result<Vec<(String, String, String, Option<String>, String)>, sqlx::Error> {
314        let rows = sqlx::query(
315            "SELECT member_id, name, recipient, signing_public_key, created_at FROM join_requests
316             WHERE project_id = ? AND status = 'pending'",
317        )
318        .bind(project_id)
319        .fetch_all(&self.pool)
320        .await?;
321
322        Ok(rows
323            .into_iter()
324            .map(|r| {
325                (
326                    r.try_get("member_id").unwrap_or_default(),
327                    r.try_get("name").unwrap_or_default(),
328                    r.try_get("recipient").unwrap_or_default(),
329                    r.try_get("signing_public_key").ok(),
330                    r.try_get("created_at").unwrap_or_default(),
331                )
332            })
333            .collect())
334    }
335
336    pub async fn upsert_join_request(
337        &self,
338        request: UpsertJoinRequest<'_>,
339    ) -> Result<(), sqlx::Error> {
340        let now = time::OffsetDateTime::now_utc().to_string();
341        sqlx::query(
342            "INSERT INTO join_requests (project_id, member_id, request_token_id, name, normalized_name, recipient, signing_public_key, status, created_at, updated_at)
343             VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)
344             ON CONFLICT(project_id, member_id) DO UPDATE SET
345             request_token_id = excluded.request_token_id,
346             name = excluded.name,
347             normalized_name = excluded.normalized_name,
348             recipient = excluded.recipient,
349             signing_public_key = excluded.signing_public_key,
350             updated_at = excluded.updated_at
351             WHERE join_requests.request_token_id = excluded.request_token_id"
352        )
353        .bind(request.project_id)
354        .bind(request.member_id)
355        .bind(request.request_token_id)
356        .bind(request.name)
357        .bind(request.normalized_name)
358        .bind(request.recipient)
359        .bind(request.signing_public_key)
360        .bind(&now)
361        .bind(&now)
362        .execute(&self.pool)
363        .await?;
364        Ok(())
365    }
366
367    pub async fn revoke_tokens(
368        &self,
369        project_id: &str,
370        token_ids: &[String],
371    ) -> Result<(), sqlx::Error> {
372        let now = time::OffsetDateTime::now_utc().to_string();
373        for token_id in token_ids {
374            sqlx::query(
375                "UPDATE project_tokens SET status = 'revoked', revoked_at = ? WHERE project_id = ? AND token_id = ?"
376            )
377            .bind(&now)
378            .bind(project_id)
379            .bind(token_id)
380            .execute(&self.pool)
381            .await?;
382        }
383        Ok(())
384    }
385
386    pub async fn list_project_tokens(
387        &self,
388        project_id: &str,
389    ) -> Result<
390        Vec<(
391            String,
392            String,
393            Option<String>,
394            String,
395            String,
396            Option<String>,
397            Option<String>,
398        )>,
399        sqlx::Error,
400    > {
401        let rows = sqlx::query(
402            "SELECT token_id, capabilities_json, member_id, status, created_at, activated_at, revoked_at FROM project_tokens WHERE project_id = ? ORDER BY created_at DESC"
403        )
404        .bind(project_id)
405        .fetch_all(&self.pool)
406        .await?;
407
408        Ok(rows
409            .into_iter()
410            .map(|r| {
411                (
412                    r.try_get("token_id").unwrap_or_default(),
413                    r.try_get("capabilities_json").unwrap_or_default(),
414                    r.try_get("member_id").ok(),
415                    r.try_get("status").unwrap_or_default(),
416                    r.try_get("created_at").unwrap_or_default(),
417                    r.try_get("activated_at").ok(),
418                    r.try_get("revoked_at").ok(),
419                )
420            })
421            .collect())
422    }
423
424    pub async fn get_project_meta(
425        &self,
426        project_id: &str,
427    ) -> Result<Option<(Option<String>, Option<String>)>, sqlx::Error> {
428        let row = sqlx::query("SELECT kagi_json, access_json FROM projects WHERE project_id = ?")
429            .bind(project_id)
430            .fetch_optional(&self.pool)
431            .await?;
432
433        match row {
434            Some(r) => {
435                let k = r.try_get::<Option<String>, _>("kagi_json")?;
436                let a = r.try_get::<Option<String>, _>("access_json")?;
437                Ok(Some((k, a)))
438            }
439            None => Ok(None),
440        }
441    }
442
443    pub async fn has_admin_token(&self) -> Result<bool, sqlx::Error> {
444        let row = sqlx::query("SELECT COUNT(*) as cnt FROM admin_tokens WHERE status = 'active'")
445            .fetch_one(&self.pool)
446            .await?;
447        let count: i64 = row.try_get("cnt").unwrap_or(0);
448        Ok(count > 0)
449    }
450
451    pub async fn create_admin_token(
452        &self,
453        token_id: &str,
454        token_hash: &str,
455        capabilities_json: &str,
456        created_at: &str,
457    ) -> Result<(), sqlx::Error> {
458        sqlx::query(
459            "INSERT INTO admin_tokens (token_id, token_hash, capabilities_json, status, created_at) VALUES (?, ?, ?, 'active', ?)"
460        )
461        .bind(token_id)
462        .bind(token_hash)
463        .bind(capabilities_json)
464        .bind(created_at)
465        .execute(&self.pool)
466        .await?;
467        Ok(())
468    }
469
470    pub async fn authenticate_admin_token(
471        &self,
472        token_hash: &str,
473    ) -> Result<Option<(String, Vec<String>)>, sqlx::Error> {
474        let row = sqlx::query(
475            "SELECT token_id, capabilities_json FROM admin_tokens WHERE token_hash = ? AND status = 'active'"
476        )
477        .bind(token_hash)
478        .fetch_optional(&self.pool)
479        .await?;
480
481        Ok(row.map(|r| {
482            let token_id: String = r.try_get("token_id").unwrap_or_default();
483            let caps_json: String = r.try_get("capabilities_json").unwrap_or_default();
484            let caps: Vec<String> = serde_json::from_str(&caps_json).unwrap_or_default();
485            (token_id, caps)
486        }))
487    }
488
489    pub async fn create_project_request(
490        &self,
491        project_id: &str,
492        requester_member_id: &str,
493        requester_name: &str,
494        requester_recipient: &str,
495        claim_secret_hash: &str,
496        kagi_json: Option<&str>,
497    ) -> Result<(), sqlx::Error> {
498        let now = time::OffsetDateTime::now_utc().to_string();
499        sqlx::query(
500            "INSERT INTO project_requests (project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?)"
501        )
502        .bind(project_id)
503        .bind(requester_member_id)
504        .bind(requester_name)
505        .bind(requester_recipient)
506        .bind(claim_secret_hash)
507        .bind(kagi_json)
508        .bind(&now)
509        .bind(&now)
510        .execute(&self.pool)
511        .await?;
512        Ok(())
513    }
514
515    pub async fn list_project_requests(
516        &self,
517    ) -> Result<
518        Vec<(
519            String,
520            String,
521            String,
522            String,
523            String,
524            Option<String>,
525            String,
526        )>,
527        sqlx::Error,
528    > {
529        let rows = sqlx::query(
530            "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE status = 'pending'"
531        )
532        .fetch_all(&self.pool)
533        .await?;
534
535        Ok(rows
536            .into_iter()
537            .map(|r| {
538                (
539                    r.try_get("project_id").unwrap_or_default(),
540                    r.try_get("requester_member_id").unwrap_or_default(),
541                    r.try_get("requester_name").unwrap_or_default(),
542                    r.try_get("requester_recipient").unwrap_or_default(),
543                    r.try_get("claim_secret_hash").unwrap_or_default(),
544                    r.try_get("kagi_json").ok(),
545                    r.try_get("status").unwrap_or_default(),
546                )
547            })
548            .collect())
549    }
550
551    pub async fn get_project_request(
552        &self,
553        project_id: &str,
554    ) -> Result<
555        Option<(
556            String,
557            String,
558            String,
559            String,
560            String,
561            Option<String>,
562            String,
563        )>,
564        sqlx::Error,
565    > {
566        let row = sqlx::query(
567            "SELECT project_id, requester_member_id, requester_name, requester_recipient, claim_secret_hash, kagi_json, status FROM project_requests WHERE project_id = ?"
568        )
569        .bind(project_id)
570        .fetch_optional(&self.pool)
571        .await?;
572
573        Ok(row.map(|r| {
574            (
575                r.try_get("project_id").unwrap_or_default(),
576                r.try_get("requester_member_id").unwrap_or_default(),
577                r.try_get("requester_name").unwrap_or_default(),
578                r.try_get("requester_recipient").unwrap_or_default(),
579                r.try_get("claim_secret_hash").unwrap_or_default(),
580                r.try_get("kagi_json").ok(),
581                r.try_get("status").unwrap_or_default(),
582            )
583        }))
584    }
585
586    #[allow(dead_code)]
587    pub async fn delete_project_request(&self, project_id: &str) -> Result<(), sqlx::Error> {
588        sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
589            .bind(project_id)
590            .execute(&self.pool)
591            .await?;
592        Ok(())
593    }
594
595    #[allow(dead_code)]
596    pub async fn create_project_member(
597        &self,
598        req: CreateProjectMemberRequest<'_>,
599    ) -> Result<(), sqlx::Error> {
600        let now = time::OffsetDateTime::now_utc().to_string();
601        sqlx::query(
602            "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
603        )
604        .bind(req.project_id)
605        .bind(req.member_id)
606        .bind(req.name)
607        .bind(req.role)
608        .bind(req.status)
609        .bind(req.recipient)
610        .bind(req.claim_secret_hash)
611        .bind(&now)
612        .bind(&now)
613        .execute(&self.pool)
614        .await?;
615        Ok(())
616    }
617
618    pub async fn get_project_member(
619        &self,
620        project_id: &str,
621        member_id: &str,
622    ) -> Result<Option<(String, String, String, String, String)>, sqlx::Error> {
623        let row = sqlx::query(
624            "SELECT name, role, status, recipient, claim_secret_hash FROM project_members WHERE project_id = ? AND member_id = ?"
625        )
626        .bind(project_id)
627        .bind(member_id)
628        .fetch_optional(&self.pool)
629        .await?;
630        Ok(row.map(|r| {
631            let name: String = r.try_get("name").unwrap_or_default();
632            let role: String = r.try_get("role").unwrap_or_default();
633            let status: String = r.try_get("status").unwrap_or_default();
634            let recipient: String = r.try_get("recipient").unwrap_or_default();
635            let claim_secret_hash: String = r.try_get("claim_secret_hash").unwrap_or_default();
636            (name, role, status, recipient, claim_secret_hash)
637        }))
638    }
639
640    #[allow(dead_code)]
641    pub async fn save_wrapped_project_token(
642        &self,
643        project_id: &str,
644        member_id: &str,
645        wrapped: &str,
646    ) -> Result<(), sqlx::Error> {
647        sqlx::query(
648            "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
649        )
650        .bind(wrapped)
651        .bind(project_id)
652        .bind(member_id)
653        .execute(&self.pool)
654        .await?;
655        Ok(())
656    }
657
658    pub async fn get_wrapped_project_token(
659        &self,
660        project_id: &str,
661        member_id: &str,
662    ) -> Result<Option<String>, sqlx::Error> {
663        let row = sqlx::query(
664            "SELECT wrapped_project_token FROM project_members WHERE project_id = ? AND member_id = ?"
665        )
666        .bind(project_id)
667        .bind(member_id)
668        .fetch_optional(&self.pool)
669        .await?;
670        Ok(row.and_then(|r| r.try_get("wrapped_project_token").ok()))
671    }
672
673    pub async fn get_project_member_role(
674        &self,
675        project_id: &str,
676        member_id: &str,
677    ) -> Result<Option<String>, sqlx::Error> {
678        let row =
679            sqlx::query("SELECT role FROM project_members WHERE project_id = ? AND member_id = ?")
680                .bind(project_id)
681                .bind(member_id)
682                .fetch_optional(&self.pool)
683                .await?;
684
685        Ok(row.map(|r| r.try_get("role").unwrap_or_default()))
686    }
687
688    pub async fn approve_project_request_tx(
689        &self,
690        req: ApproveProjectRequest<'_>,
691    ) -> Result<(), sqlx::Error> {
692        let mut tx = self.pool.begin().await?;
693        let now = time::OffsetDateTime::now_utc().to_string();
694
695        sqlx::query(
696            "INSERT INTO projects (project_id, revision, created_at, updated_at) VALUES (?, 0, ?, ?)"
697        )
698        .bind(req.project_id)
699        .bind(&now)
700        .bind(&now)
701        .execute(&mut *tx)
702        .await?;
703
704        sqlx::query(
705            "INSERT INTO project_members (project_id, member_id, name, role, status, recipient, claim_secret_hash, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
706        )
707        .bind(req.project_id)
708        .bind(req.requester_member_id)
709        .bind(req.requester_name)
710        .bind("admin")
711        .bind("active")
712        .bind(req.requester_recipient)
713        .bind(req.claim_secret_hash)
714        .bind(&now)
715        .bind(&now)
716        .execute(&mut *tx)
717        .await?;
718
719        sqlx::query(
720            "INSERT INTO project_tokens (project_id, token_id, token_hash, capabilities_json, member_id, status, created_at) VALUES (?, ?, ?, ?, ?, 'active', ?)"
721        )
722        .bind(req.project_id)
723        .bind(req.token_id)
724        .bind(req.token_hash)
725        .bind(req.caps_json)
726        .bind(req.requester_member_id)
727        .bind(&now)
728        .execute(&mut *tx)
729        .await?;
730
731        sqlx::query(
732            "UPDATE project_members SET wrapped_project_token = ? WHERE project_id = ? AND member_id = ?"
733        )
734        .bind(req.wrapped_b64)
735        .bind(req.project_id)
736        .bind(req.requester_member_id)
737        .execute(&mut *tx)
738        .await?;
739
740        sqlx::query("DELETE FROM project_requests WHERE project_id = ?")
741            .bind(req.project_id)
742            .execute(&mut *tx)
743            .await?;
744
745        tx.commit().await?;
746        Ok(())
747    }
748
749    pub async fn delete_project(&self, project_id: &str) -> Result<(), sqlx::Error> {
750        sqlx::query("DELETE FROM projects WHERE project_id = ?")
751            .bind(project_id)
752            .execute(&self.pool)
753            .await?;
754        Ok(())
755    }
756
757    pub async fn list_projects(
758        &self,
759    ) -> Result<Vec<(String, i64, Option<String>, String)>, sqlx::Error> {
760        let rows = sqlx::query("SELECT project_id, revision, kagi_json, created_at FROM projects")
761            .fetch_all(&self.pool)
762            .await?;
763
764        Ok(rows
765            .into_iter()
766            .map(|r| {
767                (
768                    r.try_get("project_id").unwrap_or_default(),
769                    r.try_get("revision").unwrap_or_default(),
770                    r.try_get("kagi_json").ok(),
771                    r.try_get("created_at").unwrap_or_default(),
772                )
773            })
774            .collect())
775    }
776
777    #[allow(clippy::too_many_arguments)]
778    pub async fn create_audit_event(
779        &self,
780        event_id: &str,
781        project_id: Option<&str>,
782        actor_member_id: Option<&str>,
783        actor_token_id: Option<&str>,
784        event_type: &str,
785        request_id: Option<&str>,
786        remote_addr: Option<&str>,
787        metadata_json: Option<&str>,
788    ) -> Result<(), sqlx::Error> {
789        let now = time::OffsetDateTime::now_utc().to_string();
790        sqlx::query(
791            "INSERT INTO audit_events (event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
792        )
793        .bind(event_id)
794        .bind(&now)
795        .bind(project_id)
796        .bind(actor_member_id)
797        .bind(actor_token_id)
798        .bind(event_type)
799        .bind(request_id)
800        .bind(remote_addr)
801        .bind(metadata_json)
802        .execute(&self.pool)
803        .await?;
804        Ok(())
805    }
806
807    #[allow(dead_code)]
808    pub async fn list_audit_events(
809        &self,
810        project_id: Option<&str>,
811        limit: i64,
812    ) -> Result<
813        Vec<(
814            String,
815            String,
816            Option<String>,
817            Option<String>,
818            Option<String>,
819            String,
820            Option<String>,
821            Option<String>,
822            Option<String>,
823        )>,
824        sqlx::Error,
825    > {
826        let rows = if let Some(pid) = project_id {
827            sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events WHERE project_id = ? ORDER BY created_at DESC LIMIT ?")
828                .bind(pid)
829                .bind(limit)
830                .fetch_all(&self.pool)
831                .await?
832        } else {
833            sqlx::query("SELECT event_id, created_at, project_id, actor_member_id, actor_token_id, event_type, request_id, remote_addr, metadata_json FROM audit_events ORDER BY created_at DESC LIMIT ?")
834                .bind(limit)
835                .fetch_all(&self.pool)
836                .await?
837        };
838
839        Ok(rows
840            .into_iter()
841            .map(|r| {
842                (
843                    r.try_get("event_id").unwrap_or_default(),
844                    r.try_get("created_at").unwrap_or_default(),
845                    r.try_get("project_id").ok(),
846                    r.try_get("actor_member_id").ok(),
847                    r.try_get("actor_token_id").ok(),
848                    r.try_get("event_type").unwrap_or_default(),
849                    r.try_get("request_id").ok(),
850                    r.try_get("remote_addr").ok(),
851                    r.try_get("metadata_json").ok(),
852                )
853            })
854            .collect())
855    }
856
857    pub async fn get_metrics(&self) -> Result<(i64, i64, i64, i64), sqlx::Error> {
858        let projects: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM projects")
859            .fetch_one(&self.pool)
860            .await?;
861        let tokens: i64 =
862            sqlx::query_scalar("SELECT COUNT(*) FROM project_tokens WHERE status != 'revoked'")
863                .fetch_one(&self.pool)
864                .await?;
865        let admins: i64 =
866            sqlx::query_scalar("SELECT COUNT(*) FROM admin_tokens WHERE status = 'active'")
867                .fetch_one(&self.pool)
868                .await?;
869        let db_size: i64 = sqlx::query_scalar(
870            "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
871        )
872        .fetch_one(&self.pool)
873        .await?;
874        Ok((projects, tokens, admins, db_size))
875    }
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881    use kagi_sync::domain::project_state::ProjectFile;
882
883    async fn test_repo() -> SqliteRemoteRepository {
884        let id = rand::random::<u64>();
885        let path = std::env::temp_dir().join(format!("kagi_test_{}.db", id));
886        SqliteRemoteRepository::new_file(path).await.unwrap()
887    }
888
889    #[tokio::test]
890    async fn test_create_project_and_pull() {
891        let repo = test_repo().await;
892        repo.create_project("kgp_test").await.unwrap();
893
894        let result = repo.pull_project_state("kgp_test").await.unwrap();
895        assert!(result.is_some());
896        let (revision, files) = result.unwrap();
897        assert_eq!(revision, 0);
898        assert!(files.is_empty());
899    }
900
901    #[tokio::test]
902    async fn test_create_project_duplicate_fails() {
903        let repo = test_repo().await;
904        repo.create_project("kgp_test").await.unwrap();
905        let err = repo.create_project("kgp_test").await.unwrap_err();
906        assert!(
907            err.as_database_error()
908                .map(|d| d.is_unique_violation())
909                .unwrap_or(false)
910        );
911    }
912
913    #[tokio::test]
914    async fn test_authenticate_token() {
915        let repo = test_repo().await;
916        repo.create_project("kgp_test").await.unwrap();
917        repo.create_token(
918            "kgp_test",
919            "kgt_123",
920            "hash_correct",
921            "[\"pull\"]",
922            Some("kgm_alice"),
923            "active",
924        )
925        .await
926        .unwrap();
927
928        let found = repo
929            .authenticate_token("kgp_test", "hash_correct")
930            .await
931            .unwrap();
932        assert!(found.is_some());
933        let (token_id, caps, member_id) = found.unwrap();
934        assert_eq!(token_id, "kgt_123");
935        assert_eq!(caps, vec!["pull"]);
936        assert_eq!(member_id, Some("kgm_alice".to_string()));
937
938        let not_found = repo
939            .authenticate_token("kgp_test", "hash_wrong")
940            .await
941            .unwrap();
942        assert!(not_found.is_none());
943    }
944
945    #[tokio::test]
946    async fn test_push_and_pull_project_state() {
947        let repo = test_repo().await;
948        repo.create_project("kgp_test").await.unwrap();
949
950        let files = vec![ProjectFile {
951            path: "dev.env".into(),
952            content: "KEY=val".into(),
953            sha256: Some("abc".into()),
954        }];
955        let new_rev = repo
956            .push_project_state(PushProjectStateRequest {
957                project_id: "kgp_test",
958                base_revision: 0,
959                kagi_json: "{}",
960                access_json: "{}",
961                files: &files,
962                activate_tokens: &[],
963                revoke_tokens: &[],
964                accepted_joins: &[],
965                manifest_json: None,
966                manifest_signature: None,
967            })
968            .await
969            .unwrap();
970        assert_eq!(new_rev, 1);
971
972        let result = repo.pull_project_state("kgp_test").await.unwrap();
973        let (revision, pulled_files) = result.unwrap();
974        assert_eq!(revision, 1);
975        assert_eq!(pulled_files.len(), 1);
976        assert_eq!(pulled_files[0].path, "dev.env");
977        assert_eq!(pulled_files[0].content, "KEY=val");
978        assert_eq!(pulled_files[0].sha256, Some("abc".to_string()));
979    }
980
981    #[tokio::test]
982    async fn test_push_conflict() {
983        let repo = test_repo().await;
984        repo.create_project("kgp_test").await.unwrap();
985
986        let err = repo
987            .push_project_state(PushProjectStateRequest {
988                project_id: "kgp_test",
989                base_revision: 99,
990                kagi_json: "{}",
991                access_json: "{}",
992                files: &[],
993                activate_tokens: &[],
994                revoke_tokens: &[],
995                accepted_joins: &[],
996                manifest_json: None,
997                manifest_signature: None,
998            })
999            .await
1000            .unwrap_err();
1001        assert!(matches!(err, sqlx::Error::RowNotFound));
1002    }
1003
1004    #[tokio::test]
1005    async fn test_join_request_flow() {
1006        let repo = test_repo().await;
1007        repo.create_project("kgp_test").await.unwrap();
1008
1009        repo.upsert_join_request(UpsertJoinRequest {
1010            project_id: "kgp_test",
1011            member_id: "kgm_bob",
1012            request_token_id: "kgt_req1",
1013            name: "Bob",
1014            normalized_name: "bob",
1015            recipient: "age1...",
1016            signing_public_key: "signing-key",
1017        })
1018        .await
1019        .unwrap();
1020
1021        let pending = repo.list_join_requests("kgp_test").await.unwrap();
1022        assert_eq!(pending.len(), 1);
1023        assert_eq!(pending[0].0, "kgm_bob");
1024        assert_eq!(pending[0].1, "Bob");
1025        assert_eq!(pending[0].2, "age1...");
1026        assert_eq!(pending[0].3.as_deref(), Some("signing-key"));
1027    }
1028
1029    #[tokio::test]
1030    async fn test_revoke_token() {
1031        let repo = test_repo().await;
1032        repo.create_project("kgp_test").await.unwrap();
1033        repo.create_token(
1034            "kgp_test",
1035            "kgt_123",
1036            "hash_value",
1037            "[\"pull\"]",
1038            None,
1039            "active",
1040        )
1041        .await
1042        .unwrap();
1043
1044        repo.revoke_tokens("kgp_test", &["kgt_123".into()])
1045            .await
1046            .unwrap();
1047
1048        let found = repo
1049            .authenticate_token("kgp_test", "hash_value")
1050            .await
1051            .unwrap();
1052        assert!(found.is_none());
1053    }
1054
1055    #[tokio::test]
1056    async fn test_get_project_meta() {
1057        let repo = test_repo().await;
1058        repo.create_project("kgp_test").await.unwrap();
1059
1060        let files = vec![ProjectFile {
1061            path: "a".into(),
1062            content: "b".into(),
1063            sha256: None,
1064        }];
1065        repo.push_project_state(PushProjectStateRequest {
1066            project_id: "kgp_test",
1067            base_revision: 0,
1068            kagi_json: "{\"k\":1}",
1069            access_json: "{\"a\":2}",
1070            files: &files,
1071            activate_tokens: &[],
1072            revoke_tokens: &[],
1073            accepted_joins: &[],
1074            manifest_json: None,
1075            manifest_signature: None,
1076        })
1077        .await
1078        .unwrap();
1079
1080        let meta = repo.get_project_meta("kgp_test").await.unwrap();
1081        assert!(meta.is_some());
1082        let (kagi_json, access_json) = meta.unwrap();
1083        assert_eq!(kagi_json, Some("{\"k\":1}".to_string()));
1084        assert_eq!(access_json, Some("{\"a\":2}".to_string()));
1085    }
1086
1087    #[tokio::test]
1088    async fn test_admin_token_lifecycle() {
1089        let repo = test_repo().await;
1090        assert!(!repo.has_admin_token().await.unwrap());
1091
1092        let created_at = time::OffsetDateTime::now_utc().to_string();
1093        repo.create_admin_token("kat_123", "hash_admin", "[\"admin\"]", &created_at)
1094            .await
1095            .unwrap();
1096
1097        assert!(repo.has_admin_token().await.unwrap());
1098
1099        let found = repo.authenticate_admin_token("hash_admin").await.unwrap();
1100        assert!(found.is_some());
1101        let (token_id, caps) = found.unwrap();
1102        assert_eq!(token_id, "kat_123");
1103        assert_eq!(caps, vec!["admin"]);
1104    }
1105
1106    #[tokio::test]
1107    async fn test_authenticate_admin_token_wrong_hash() {
1108        let repo = test_repo().await;
1109        let created_at = time::OffsetDateTime::now_utc().to_string();
1110        repo.create_admin_token("kat_123", "hash_correct", "[\"admin\"]", &created_at)
1111            .await
1112            .unwrap();
1113
1114        let not_found = repo.authenticate_admin_token("hash_wrong").await.unwrap();
1115        assert!(not_found.is_none());
1116    }
1117
1118    #[tokio::test]
1119    async fn test_project_request_lifecycle() {
1120        let repo = test_repo().await;
1121        repo.create_project_request(
1122            "kgp_req",
1123            "kgm_alice",
1124            "Alice",
1125            "age1...",
1126            "cs:test",
1127            Some("{\"key\":1}"),
1128        )
1129        .await
1130        .unwrap();
1131
1132        let requests = repo.list_project_requests().await.unwrap();
1133        assert_eq!(requests.len(), 1);
1134        let (project_id, member_id, name, recipient, _hash, kagi_json, status) = &requests[0];
1135        assert_eq!(project_id, "kgp_req");
1136        assert_eq!(member_id, "kgm_alice");
1137        assert_eq!(name, "Alice");
1138        assert_eq!(recipient, "age1...");
1139        assert_eq!(kagi_json.as_deref(), Some("{\"key\":1}"));
1140        assert_eq!(status, "pending");
1141
1142        let single = repo.get_project_request("kgp_req").await.unwrap();
1143        assert!(single.is_some());
1144        let (project_id2, member_id2, name2, recipient2, _hash2, kagi_json2, status2) =
1145            single.unwrap();
1146        assert_eq!(project_id2, "kgp_req");
1147        assert_eq!(member_id2, "kgm_alice");
1148        assert_eq!(name2, "Alice");
1149        assert_eq!(recipient2, "age1...");
1150        assert_eq!(kagi_json2.as_deref(), Some("{\"key\":1}"));
1151        assert_eq!(status2, "pending");
1152
1153        repo.delete_project_request("kgp_req").await.unwrap();
1154        let after_delete = repo.list_project_requests().await.unwrap();
1155        assert!(after_delete.is_empty());
1156    }
1157
1158    #[tokio::test]
1159    async fn test_project_member_lifecycle() {
1160        let repo = test_repo().await;
1161        repo.create_project("kgp_test").await.unwrap();
1162
1163        repo.create_project_member(CreateProjectMemberRequest {
1164            project_id: "kgp_test",
1165            member_id: "kgm_bob",
1166            name: "Bob",
1167            role: "admin",
1168            status: "active",
1169            recipient: "age1...",
1170            claim_secret_hash: "cs:test",
1171        })
1172        .await
1173        .unwrap();
1174
1175        let role = repo
1176            .get_project_member_role("kgp_test", "kgm_bob")
1177            .await
1178            .unwrap();
1179        assert_eq!(role, Some("admin".to_string()));
1180
1181        repo.delete_project("kgp_test").await.unwrap();
1182
1183        let role_after = repo
1184            .get_project_member_role("kgp_test", "kgm_bob")
1185            .await
1186            .unwrap();
1187        assert!(role_after.is_none());
1188    }
1189
1190    #[tokio::test]
1191    async fn test_list_projects() {
1192        let repo = test_repo().await;
1193        repo.create_project("kgp_a").await.unwrap();
1194        repo.create_project("kgp_b").await.unwrap();
1195
1196        let projects = repo.list_projects().await.unwrap();
1197        assert_eq!(projects.len(), 2);
1198        let ids: Vec<String> = projects.iter().map(|p| p.0.clone()).collect();
1199        assert!(ids.contains(&"kgp_a".to_string()));
1200        assert!(ids.contains(&"kgp_b".to_string()));
1201    }
1202
1203    #[tokio::test]
1204    async fn test_audit_event_lifecycle() {
1205        let repo = test_repo().await;
1206        repo.create_project("kgp_test").await.unwrap();
1207
1208        repo.create_audit_event(
1209            "kae_1",
1210            Some("kgp_test"),
1211            Some("kgm_alice"),
1212            Some("kgt_123"),
1213            "push",
1214            Some("kgr_1"),
1215            Some("127.0.0.1"),
1216            Some("{\"revision\":1}"),
1217        )
1218        .await
1219        .unwrap();
1220
1221        let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1222        assert_eq!(events.len(), 1);
1223        let (
1224            event_id,
1225            _created_at,
1226            project_id,
1227            actor_member_id,
1228            actor_token_id,
1229            event_type,
1230            request_id,
1231            remote_addr,
1232            metadata_json,
1233        ) = &events[0];
1234        assert_eq!(event_id, "kae_1");
1235        assert_eq!(project_id.as_deref(), Some("kgp_test"));
1236        assert_eq!(actor_member_id.as_deref(), Some("kgm_alice"));
1237        assert_eq!(actor_token_id.as_deref(), Some("kgt_123"));
1238        assert_eq!(event_type, "push");
1239        assert_eq!(request_id.as_deref(), Some("kgr_1"));
1240        assert_eq!(remote_addr.as_deref(), Some("127.0.0.1"));
1241        assert_eq!(metadata_json.as_deref(), Some("{\"revision\":1}"));
1242    }
1243
1244    #[tokio::test]
1245    async fn test_audit_event_does_not_leak_sensitive_data() {
1246        let repo = test_repo().await;
1247        repo.create_project("kgp_test").await.unwrap();
1248
1249        repo.create_audit_event(
1250            "kae_1",
1251            Some("kgp_test"),
1252            None,
1253            None,
1254            "project_request_created",
1255            Some("kgr_1"),
1256            Some("127.0.0.1"),
1257            Some("{\"requester_name\":\"Alice\"}"),
1258        )
1259        .await
1260        .unwrap();
1261
1262        let events = repo.list_audit_events(Some("kgp_test"), 10).await.unwrap();
1263        assert_eq!(events.len(), 1);
1264        let metadata_json = &events[0].8;
1265        let meta = metadata_json.as_deref().unwrap_or("");
1266        assert!(!meta.contains("secret"));
1267        assert!(!meta.contains("token"));
1268        assert!(!meta.contains("claim_secret"));
1269    }
1270
1271    #[tokio::test]
1272    async fn test_metrics() {
1273        let repo = test_repo().await;
1274        repo.create_project("kgp_test").await.unwrap();
1275        let (projects, tokens, admins, db_size) = repo.get_metrics().await.unwrap();
1276        assert_eq!(projects, 1);
1277        assert_eq!(tokens, 0);
1278        assert_eq!(admins, 0);
1279        assert!(db_size > 0);
1280    }
1281}