1use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use sqlx::{sqlite::SqlitePoolOptions, SqlitePool};
26
27#[derive(Clone)]
30pub struct Store {
31 pool: SqlitePool,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct OutboxRow {
37 pub id: String,
38 pub actor_id: String,
39 pub activity: serde_json::Value,
40 pub created_at: DateTime<Utc>,
41 pub delivery_state: String,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct InboxRow {
47 pub id: String,
48 pub actor_id: String,
49 pub activity: serde_json::Value,
50 pub received_at: DateTime<Utc>,
51}
52
53#[derive(Debug, Clone)]
55pub struct DeliveryItem {
56 pub queue_id: i64,
57 pub activity_id: String,
58 pub inbox_url: String,
59 pub attempts: i64,
60 pub last_error: Option<String>,
61}
62
63const SCHEMA: &str = r#"
64CREATE TABLE IF NOT EXISTS followers (
65 actor_id TEXT NOT NULL,
66 follower_id TEXT NOT NULL,
67 inbox TEXT,
68 accepted_at DATETIME,
69 PRIMARY KEY (actor_id, follower_id)
70);
71CREATE TABLE IF NOT EXISTS following (
72 actor_id TEXT NOT NULL,
73 target_id TEXT NOT NULL,
74 requested_at DATETIME NOT NULL,
75 accepted BOOLEAN NOT NULL DEFAULT 0,
76 PRIMARY KEY (actor_id, target_id)
77);
78CREATE TABLE IF NOT EXISTS inbox (
79 id TEXT PRIMARY KEY,
80 actor_id TEXT NOT NULL,
81 activity TEXT NOT NULL,
82 received_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
83);
84CREATE TABLE IF NOT EXISTS outbox (
85 id TEXT PRIMARY KEY,
86 actor_id TEXT NOT NULL,
87 activity TEXT NOT NULL,
88 created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
89 delivery_state TEXT NOT NULL DEFAULT 'pending'
90);
91CREATE TABLE IF NOT EXISTS delivery_queue (
92 id INTEGER PRIMARY KEY AUTOINCREMENT,
93 activity_id TEXT NOT NULL,
94 inbox_url TEXT NOT NULL,
95 attempts INTEGER NOT NULL DEFAULT 0,
96 next_retry DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
97 last_error TEXT
98);
99CREATE TABLE IF NOT EXISTS actors (
100 id TEXT PRIMARY KEY,
101 data TEXT NOT NULL,
102 fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
103);
104"#;
105
106impl Store {
107 pub async fn connect(url: &str) -> Result<Self, sqlx::Error> {
110 let pool = SqlitePoolOptions::new()
111 .max_connections(5)
112 .connect(url)
113 .await?;
114 sqlx::query(SCHEMA).execute(&pool).await?;
115 Ok(Self { pool })
116 }
117
118 pub async fn in_memory() -> Result<Self, sqlx::Error> {
120 Self::connect("sqlite::memory:?cache=shared").await
123 }
124
125 pub fn pool(&self) -> &SqlitePool {
128 &self.pool
129 }
130
131 pub async fn add_follower(
134 &self,
135 actor_id: &str,
136 follower_id: &str,
137 inbox: Option<&str>,
138 ) -> Result<(), sqlx::Error> {
139 let now = Utc::now();
140 sqlx::query(
141 "INSERT OR REPLACE INTO followers (actor_id, follower_id, inbox, accepted_at) \
142 VALUES (?1, ?2, ?3, ?4)",
143 )
144 .bind(actor_id)
145 .bind(follower_id)
146 .bind(inbox)
147 .bind(now)
148 .execute(&self.pool)
149 .await?;
150 Ok(())
151 }
152
153 pub async fn remove_follower(
154 &self,
155 actor_id: &str,
156 follower_id: &str,
157 ) -> Result<u64, sqlx::Error> {
158 let res = sqlx::query(
159 "DELETE FROM followers WHERE actor_id = ?1 AND follower_id = ?2",
160 )
161 .bind(actor_id)
162 .bind(follower_id)
163 .execute(&self.pool)
164 .await?;
165 Ok(res.rows_affected())
166 }
167
168 pub async fn is_follower(
169 &self,
170 actor_id: &str,
171 follower_id: &str,
172 ) -> Result<bool, sqlx::Error> {
173 let row: Option<(i64,)> = sqlx::query_as(
174 "SELECT 1 FROM followers WHERE actor_id = ?1 AND follower_id = ?2",
175 )
176 .bind(actor_id)
177 .bind(follower_id)
178 .fetch_optional(&self.pool)
179 .await?;
180 Ok(row.is_some())
181 }
182
183 pub async fn follower_inboxes(&self, actor_id: &str) -> Result<Vec<String>, sqlx::Error> {
184 let rows: Vec<(String,)> = sqlx::query_as(
185 "SELECT DISTINCT inbox FROM followers WHERE actor_id = ?1 AND inbox IS NOT NULL",
186 )
187 .bind(actor_id)
188 .fetch_all(&self.pool)
189 .await?;
190 Ok(rows.into_iter().map(|(s,)| s).collect())
191 }
192
193 pub async fn get_follower_inboxes(&self, actor_id: &str) -> Result<Vec<String>, sqlx::Error> {
198 self.follower_inboxes(actor_id).await
199 }
200
201 pub async fn follower_count(&self, actor_id: &str) -> Result<i64, sqlx::Error> {
202 let (n,): (i64,) =
203 sqlx::query_as("SELECT COUNT(*) FROM followers WHERE actor_id = ?1")
204 .bind(actor_id)
205 .fetch_one(&self.pool)
206 .await?;
207 Ok(n)
208 }
209
210 pub async fn add_following(
213 &self,
214 actor_id: &str,
215 target_id: &str,
216 ) -> Result<(), sqlx::Error> {
217 let now = Utc::now();
218 sqlx::query(
219 "INSERT OR REPLACE INTO following (actor_id, target_id, requested_at, accepted) \
220 VALUES (?1, ?2, ?3, 0)",
221 )
222 .bind(actor_id)
223 .bind(target_id)
224 .bind(now)
225 .execute(&self.pool)
226 .await?;
227 Ok(())
228 }
229
230 pub async fn accept_following(
231 &self,
232 actor_id: &str,
233 target_id: &str,
234 ) -> Result<u64, sqlx::Error> {
235 let res = sqlx::query(
236 "UPDATE following SET accepted = 1 WHERE actor_id = ?1 AND target_id = ?2",
237 )
238 .bind(actor_id)
239 .bind(target_id)
240 .execute(&self.pool)
241 .await?;
242 Ok(res.rows_affected())
243 }
244
245 pub async fn is_following(
246 &self,
247 actor_id: &str,
248 target_id: &str,
249 ) -> Result<bool, sqlx::Error> {
250 let row: Option<(i64,)> = sqlx::query_as(
251 "SELECT accepted FROM following WHERE actor_id = ?1 AND target_id = ?2",
252 )
253 .bind(actor_id)
254 .bind(target_id)
255 .fetch_optional(&self.pool)
256 .await?;
257 Ok(matches!(row, Some((1,))))
258 }
259
260 pub async fn record_inbox(
264 &self,
265 actor_id: &str,
266 activity: &serde_json::Value,
267 ) -> Result<bool, sqlx::Error> {
268 let id = activity
269 .get("id")
270 .and_then(|v| v.as_str())
271 .unwrap_or("")
272 .to_string();
273 if id.is_empty() {
274 return Ok(false);
275 }
276 let body = serde_json::to_string(activity).unwrap_or_else(|_| "{}".into());
277 let res = sqlx::query(
278 "INSERT OR IGNORE INTO inbox (id, actor_id, activity, received_at) \
279 VALUES (?1, ?2, ?3, ?4)",
280 )
281 .bind(&id)
282 .bind(actor_id)
283 .bind(&body)
284 .bind(Utc::now())
285 .execute(&self.pool)
286 .await?;
287 Ok(res.rows_affected() > 0)
288 }
289
290 pub async fn get_inbox(&self, id: &str) -> Result<Option<InboxRow>, sqlx::Error> {
291 let row: Option<(String, String, String, DateTime<Utc>)> = sqlx::query_as(
292 "SELECT id, actor_id, activity, received_at FROM inbox WHERE id = ?1",
293 )
294 .bind(id)
295 .fetch_optional(&self.pool)
296 .await?;
297 Ok(row.map(|(id, actor_id, activity, received_at)| InboxRow {
298 id,
299 actor_id,
300 activity: serde_json::from_str(&activity).unwrap_or(serde_json::Value::Null),
301 received_at,
302 }))
303 }
304
305 pub async fn inbox_count(&self) -> Result<i64, sqlx::Error> {
306 let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inbox")
307 .fetch_one(&self.pool)
308 .await?;
309 Ok(n)
310 }
311
312 pub async fn record_outbox(
315 &self,
316 actor_id: &str,
317 activity: &serde_json::Value,
318 ) -> Result<String, sqlx::Error> {
319 let id = activity
320 .get("id")
321 .and_then(|v| v.as_str())
322 .map(|s| s.to_string())
323 .unwrap_or_else(|| format!("urn:uuid:{}", uuid::Uuid::new_v4()));
324 let body = serde_json::to_string(activity).unwrap_or_else(|_| "{}".into());
325 sqlx::query(
326 "INSERT OR REPLACE INTO outbox (id, actor_id, activity, created_at, delivery_state) \
327 VALUES (?1, ?2, ?3, ?4, 'pending')",
328 )
329 .bind(&id)
330 .bind(actor_id)
331 .bind(&body)
332 .bind(Utc::now())
333 .execute(&self.pool)
334 .await?;
335 Ok(id)
336 }
337
338 pub async fn mark_outbox_state(
339 &self,
340 id: &str,
341 state: &str,
342 ) -> Result<u64, sqlx::Error> {
343 let res = sqlx::query("UPDATE outbox SET delivery_state = ?1 WHERE id = ?2")
344 .bind(state)
345 .bind(id)
346 .execute(&self.pool)
347 .await?;
348 Ok(res.rows_affected())
349 }
350
351 pub async fn outbox_count(&self) -> Result<i64, sqlx::Error> {
352 let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM outbox")
353 .fetch_one(&self.pool)
354 .await?;
355 Ok(n)
356 }
357
358 pub async fn enqueue_delivery(
361 &self,
362 activity_id: &str,
363 inbox_url: &str,
364 ) -> Result<i64, sqlx::Error> {
365 let res = sqlx::query(
366 "INSERT INTO delivery_queue (activity_id, inbox_url, attempts, next_retry) \
367 VALUES (?1, ?2, 0, ?3)",
368 )
369 .bind(activity_id)
370 .bind(inbox_url)
371 .bind(Utc::now())
372 .execute(&self.pool)
373 .await?;
374 Ok(res.last_insert_rowid())
375 }
376
377 pub async fn next_due_delivery(&self) -> Result<Option<DeliveryItem>, sqlx::Error> {
378 let row: Option<(i64, String, String, i64, Option<String>)> = sqlx::query_as(
379 "SELECT id, activity_id, inbox_url, attempts, last_error FROM delivery_queue \
380 WHERE next_retry <= ?1 ORDER BY id ASC LIMIT 1",
381 )
382 .bind(Utc::now())
383 .fetch_optional(&self.pool)
384 .await?;
385 Ok(row.map(
386 |(queue_id, activity_id, inbox_url, attempts, last_error)| DeliveryItem {
387 queue_id,
388 activity_id,
389 inbox_url,
390 attempts,
391 last_error,
392 },
393 ))
394 }
395
396 pub async fn drop_delivery(&self, queue_id: i64) -> Result<u64, sqlx::Error> {
397 let res = sqlx::query("DELETE FROM delivery_queue WHERE id = ?1")
398 .bind(queue_id)
399 .execute(&self.pool)
400 .await?;
401 Ok(res.rows_affected())
402 }
403
404 pub async fn reschedule_delivery(
405 &self,
406 queue_id: i64,
407 delay_secs: i64,
408 error: &str,
409 ) -> Result<u64, sqlx::Error> {
410 let next_retry =
411 Utc::now() + chrono::Duration::seconds(delay_secs.max(0));
412 let res = sqlx::query(
413 "UPDATE delivery_queue \
414 SET attempts = attempts + 1, next_retry = ?1, last_error = ?2 \
415 WHERE id = ?3",
416 )
417 .bind(next_retry)
418 .bind(error)
419 .bind(queue_id)
420 .execute(&self.pool)
421 .await?;
422 Ok(res.rows_affected())
423 }
424
425 pub async fn cache_actor(
435 &self,
436 actor_id: &str,
437 data: &serde_json::Value,
438 ) -> Result<(), sqlx::Error> {
439 let body = serde_json::to_string(data).unwrap_or_else(|_| "{}".into());
440 let now = Utc::now();
441 sqlx::query(
442 "INSERT OR REPLACE INTO actors (id, data, fetched_at) VALUES (?1, ?2, ?3)",
443 )
444 .bind(actor_id)
445 .bind(&body)
446 .bind(now)
447 .execute(&self.pool)
448 .await?;
449 Ok(())
450 }
451
452 pub async fn get_cached_actor(
454 &self,
455 actor_id: &str,
456 ) -> Result<Option<(serde_json::Value, DateTime<Utc>)>, sqlx::Error> {
457 let row: Option<(String, DateTime<Utc>)> = sqlx::query_as(
458 "SELECT data, fetched_at FROM actors WHERE id = ?1",
459 )
460 .bind(actor_id)
461 .fetch_optional(&self.pool)
462 .await?;
463 Ok(row.map(|(data, fetched_at)| {
464 let parsed = serde_json::from_str(&data).unwrap_or(serde_json::Value::Null);
465 (parsed, fetched_at)
466 }))
467 }
468
469 pub async fn is_actor_cache_fresh(
478 &self,
479 actor_id: &str,
480 max_age: chrono::Duration,
481 ) -> Result<bool, sqlx::Error> {
482 match self.get_cached_actor(actor_id).await? {
483 Some((_data, fetched_at)) => {
484 let cutoff = Utc::now() - max_age;
485 Ok(fetched_at >= cutoff)
486 }
487 None => Ok(false),
488 }
489 }
490
491 pub async fn load_activity(
492 &self,
493 activity_id: &str,
494 ) -> Result<Option<serde_json::Value>, sqlx::Error> {
495 let row: Option<(String,)> =
496 sqlx::query_as("SELECT activity FROM outbox WHERE id = ?1")
497 .bind(activity_id)
498 .fetch_optional(&self.pool)
499 .await?;
500 Ok(row.and_then(|(s,)| serde_json::from_str(&s).ok()))
501 }
502}
503
504#[cfg(test)]
509mod tests {
510 use super::*;
511
512 async fn fresh() -> Store {
513 Store::in_memory().await.unwrap()
514 }
515
516 #[tokio::test]
517 async fn followers_roundtrip() {
518 let s = fresh().await;
519 s.add_follower("me", "them", Some("https://them/inbox"))
520 .await
521 .unwrap();
522 assert!(s.is_follower("me", "them").await.unwrap());
523 assert_eq!(s.follower_count("me").await.unwrap(), 1);
524 let inboxes = s.follower_inboxes("me").await.unwrap();
525 assert_eq!(inboxes, vec!["https://them/inbox".to_string()]);
526 s.remove_follower("me", "them").await.unwrap();
527 assert!(!s.is_follower("me", "them").await.unwrap());
528 }
529
530 #[tokio::test]
531 async fn following_lifecycle() {
532 let s = fresh().await;
533 s.add_following("me", "https://other/actor").await.unwrap();
534 assert!(!s.is_following("me", "https://other/actor").await.unwrap());
535 s.accept_following("me", "https://other/actor")
536 .await
537 .unwrap();
538 assert!(s.is_following("me", "https://other/actor").await.unwrap());
539 }
540
541 #[tokio::test]
542 async fn inbox_insert_is_idempotent_by_id() {
543 let s = fresh().await;
544 let act = serde_json::json!({"id": "https://a/1", "type": "Create"});
545 assert!(s.record_inbox("me", &act).await.unwrap());
546 assert!(!s.record_inbox("me", &act).await.unwrap());
547 assert_eq!(s.inbox_count().await.unwrap(), 1);
548 }
549
550 #[tokio::test]
551 async fn outbox_records_and_updates_state() {
552 let s = fresh().await;
553 let act = serde_json::json!({"id": "https://me/out/1", "type": "Create"});
554 let id = s.record_outbox("me", &act).await.unwrap();
555 assert_eq!(id, "https://me/out/1");
556 assert_eq!(s.outbox_count().await.unwrap(), 1);
557 let updated = s.mark_outbox_state(&id, "delivered").await.unwrap();
558 assert_eq!(updated, 1);
559 }
560
561 #[tokio::test]
562 async fn delivery_queue_roundtrip() {
563 let s = fresh().await;
564 let qid = s
565 .enqueue_delivery("https://me/out/1", "https://them/inbox")
566 .await
567 .unwrap();
568 let item = s.next_due_delivery().await.unwrap().unwrap();
569 assert_eq!(item.queue_id, qid);
570 assert_eq!(item.inbox_url, "https://them/inbox");
571 s.reschedule_delivery(qid, 0, "transient").await.unwrap();
572 let again = s.next_due_delivery().await.unwrap().unwrap();
573 assert_eq!(again.attempts, 1);
574 assert_eq!(again.last_error.as_deref(), Some("transient"));
575 s.drop_delivery(qid).await.unwrap();
576 assert!(s.next_due_delivery().await.unwrap().is_none());
577 }
578
579 #[tokio::test]
580 async fn actor_cache_roundtrip() {
581 let s = fresh().await;
582 let actor_id = "https://remote.example/actor";
583 let data = serde_json::json!({"type": "Person", "name": "Remote"});
584
585 assert!(s.get_cached_actor(actor_id).await.unwrap().is_none());
587
588 s.cache_actor(actor_id, &data).await.unwrap();
590 let (cached, fetched_at) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
591 assert_eq!(cached["name"], "Remote");
592 let age = Utc::now() - fetched_at;
594 assert!(age.num_seconds() < 5, "fetched_at too old: {age}");
595 }
596
597 #[tokio::test]
598 async fn actor_cache_freshness_check() {
599 let s = fresh().await;
600 let actor_id = "https://remote.example/actor2";
601 let data = serde_json::json!({"type": "Person"});
602 s.cache_actor(actor_id, &data).await.unwrap();
603
604 assert!(s
606 .is_actor_cache_fresh(actor_id, chrono::Duration::hours(1))
607 .await
608 .unwrap());
609
610 assert!(!s
617 .is_actor_cache_fresh("https://never-cached.example/actor", chrono::Duration::hours(1))
618 .await
619 .unwrap());
620 }
621
622 #[tokio::test]
623 async fn actor_cache_upsert_updates_fetched_at() {
624 let s = fresh().await;
625 let actor_id = "https://remote.example/actor3";
626 let data1 = serde_json::json!({"name": "v1"});
627 s.cache_actor(actor_id, &data1).await.unwrap();
628 let (_, ts1) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
629
630 let data2 = serde_json::json!({"name": "v2"});
632 s.cache_actor(actor_id, &data2).await.unwrap();
633 let (cached, ts2) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
634 assert_eq!(cached["name"], "v2");
635 assert!(ts2 >= ts1, "fetched_at should not go backwards");
636 }
637
638 #[tokio::test]
639 async fn actor_cache_datetime_is_iso8601() {
640 let s = fresh().await;
641 let actor_id = "https://remote.example/actor-dt";
642 let data = serde_json::json!({"type": "Person"});
643 s.cache_actor(actor_id, &data).await.unwrap();
644
645 let (raw,): (String,) = sqlx::query_as(
647 "SELECT fetched_at FROM actors WHERE id = ?1",
648 )
649 .bind(actor_id)
650 .fetch_one(s.pool())
651 .await
652 .unwrap();
653 let parsed = chrono::DateTime::parse_from_rfc3339(&raw)
656 .or_else(|_| {
657 chrono::NaiveDateTime::parse_from_str(&raw, "%Y-%m-%d %H:%M:%S%.f")
659 .map(|ndt| ndt.and_utc().fixed_offset())
660 .or_else(|_| {
661 chrono::NaiveDateTime::parse_from_str(&raw, "%Y-%m-%dT%H:%M:%S%.f")
662 .map(|ndt| ndt.and_utc().fixed_offset())
663 })
664 });
665 assert!(parsed.is_ok(), "fetched_at is not a valid datetime: {raw}");
666 }
667
668 #[tokio::test]
669 async fn get_follower_inboxes_alias() {
670 let s = fresh().await;
671 s.add_follower("actor-a", "f1", Some("https://f1/inbox"))
672 .await
673 .unwrap();
674 s.add_follower("actor-a", "f2", Some("https://f2/inbox"))
675 .await
676 .unwrap();
677 s.add_follower("actor-a", "f3", None).await.unwrap(); let inboxes = s.get_follower_inboxes("actor-a").await.unwrap();
679 assert_eq!(inboxes.len(), 2);
680 assert!(inboxes.contains(&"https://f1/inbox".to_string()));
681 assert!(inboxes.contains(&"https://f2/inbox".to_string()));
682 }
683
684 #[tokio::test]
685 async fn follower_inbox_fanout_enqueues_all() {
686 let s = fresh().await;
687 let actor_id = "me";
688 s.add_follower(actor_id, "a", Some("https://a/inbox")).await.unwrap();
689 s.add_follower(actor_id, "b", Some("https://b/inbox")).await.unwrap();
690 s.add_follower(actor_id, "c", Some("https://c/inbox")).await.unwrap();
691
692 let inboxes = s.get_follower_inboxes(actor_id).await.unwrap();
693 let activity_id = "https://me/out/fanout-1";
694 for inbox in &inboxes {
695 s.enqueue_delivery(activity_id, inbox).await.unwrap();
696 }
697
698 let (n,): (i64,) = sqlx::query_as(
700 "SELECT COUNT(*) FROM delivery_queue WHERE activity_id = ?1",
701 )
702 .bind(activity_id)
703 .fetch_one(s.pool())
704 .await
705 .unwrap();
706 assert_eq!(n, 3);
707 }
708}