Skip to main content

solid_pod_rs_activitypub/
store.rs

1//! SQLite-backed persistence for followers, following, inbox, outbox
2//! and the federated delivery queue.
3//!
4//! LINE-FOR-LINE `jss/src/ap/store.js`:
5//!
6//! * followers(id PRIMARY KEY, actor, inbox, created_at)
7//! * following(id PRIMARY KEY, actor, accepted, created_at)
8//! * activities(id PRIMARY KEY, type, actor, object, raw, created_at)
9//! * posts(id PRIMARY KEY, content, in_reply_to, published)
10//! * actors(id PRIMARY KEY, data, fetched_at)
11//!
12//! We diverge in three ways:
13//!   1. The primary key on `inbox` is the activity `id` — JSS's
14//!      `activities` table conflates inbox + outbox; we split them for
15//!      clarity and per-kind indexing.
16//!   2. A dedicated `delivery_queue` table feeds the background worker
17//!      in [`crate::delivery`]. JSS does in-process retry; we do
18//!      durable retry across restarts.
19//!   3. The `followers` row's primary key is `(actor_id, follower_id)`
20//!      so we can model multi-user pods without hashing the pair into
21//!      a surrogate.
22
23use chrono::{DateTime, Utc};
24use serde::{Deserialize, Serialize};
25use sqlx::{sqlite::SqlitePoolOptions, SqlitePool};
26
27/// Opaque store handle. Clone freely — the underlying pool is
28/// reference-counted.
29#[derive(Clone)]
30pub struct Store {
31    pool: SqlitePool,
32}
33
34/// Outbox row representation.
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct OutboxRow {
37    pub id: String,
38    pub actor_id: String,
39    pub activity: serde_json::Value,
40    pub created_at: DateTime<Utc>,
41    pub delivery_state: String,
42}
43
44/// Inbox row representation.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct InboxRow {
47    pub id: String,
48    pub actor_id: String,
49    pub activity: serde_json::Value,
50    pub received_at: DateTime<Utc>,
51}
52
53/// A single queued delivery awaiting transmission.
54#[derive(Debug, Clone)]
55pub struct DeliveryItem {
56    pub queue_id: i64,
57    pub activity_id: String,
58    pub inbox_url: String,
59    pub attempts: i64,
60    pub last_error: Option<String>,
61}
62
63const SCHEMA: &str = r#"
64CREATE TABLE IF NOT EXISTS followers (
65    actor_id TEXT NOT NULL,
66    follower_id TEXT NOT NULL,
67    inbox TEXT,
68    accepted_at DATETIME,
69    PRIMARY KEY (actor_id, follower_id)
70);
71CREATE TABLE IF NOT EXISTS following (
72    actor_id TEXT NOT NULL,
73    target_id TEXT NOT NULL,
74    requested_at DATETIME NOT NULL,
75    accepted BOOLEAN NOT NULL DEFAULT 0,
76    PRIMARY KEY (actor_id, target_id)
77);
78CREATE TABLE IF NOT EXISTS inbox (
79    id TEXT PRIMARY KEY,
80    actor_id TEXT NOT NULL,
81    activity TEXT NOT NULL,
82    received_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
83);
84CREATE TABLE IF NOT EXISTS outbox (
85    id TEXT PRIMARY KEY,
86    actor_id TEXT NOT NULL,
87    activity TEXT NOT NULL,
88    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
89    delivery_state TEXT NOT NULL DEFAULT 'pending'
90);
91CREATE TABLE IF NOT EXISTS delivery_queue (
92    id INTEGER PRIMARY KEY AUTOINCREMENT,
93    activity_id TEXT NOT NULL,
94    inbox_url TEXT NOT NULL,
95    attempts INTEGER NOT NULL DEFAULT 0,
96    next_retry DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
97    last_error TEXT
98);
99CREATE TABLE IF NOT EXISTS actors (
100    id TEXT PRIMARY KEY,
101    data TEXT NOT NULL,
102    fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
103);
104"#;
105
106impl Store {
107    /// Connect to an arbitrary SQLite URL (use `sqlite::memory:` for
108    /// tests). Runs the schema idempotently.
109    pub async fn connect(url: &str) -> Result<Self, sqlx::Error> {
110        let pool = SqlitePoolOptions::new()
111            .max_connections(5)
112            .connect(url)
113            .await?;
114        sqlx::query(SCHEMA).execute(&pool).await?;
115        Ok(Self { pool })
116    }
117
118    /// In-memory store — useful for tests.
119    pub async fn in_memory() -> Result<Self, sqlx::Error> {
120        // The `sqlite::memory:` URL creates a fresh DB per connection,
121        // which breaks pooling. Use a shared in-memory URL instead.
122        Self::connect("sqlite::memory:?cache=shared").await
123    }
124
125    /// Expose the pool for advanced callers. Prefer the typed helpers
126    /// below where possible.
127    pub fn pool(&self) -> &SqlitePool {
128        &self.pool
129    }
130
131    // -------------------------- followers --------------------------------
132
133    pub async fn add_follower(
134        &self,
135        actor_id: &str,
136        follower_id: &str,
137        inbox: Option<&str>,
138    ) -> Result<(), sqlx::Error> {
139        let now = Utc::now();
140        sqlx::query(
141            "INSERT OR REPLACE INTO followers (actor_id, follower_id, inbox, accepted_at) \
142             VALUES (?1, ?2, ?3, ?4)",
143        )
144        .bind(actor_id)
145        .bind(follower_id)
146        .bind(inbox)
147        .bind(now)
148        .execute(&self.pool)
149        .await?;
150        Ok(())
151    }
152
153    pub async fn remove_follower(
154        &self,
155        actor_id: &str,
156        follower_id: &str,
157    ) -> Result<u64, sqlx::Error> {
158        let res = sqlx::query(
159            "DELETE FROM followers WHERE actor_id = ?1 AND follower_id = ?2",
160        )
161        .bind(actor_id)
162        .bind(follower_id)
163        .execute(&self.pool)
164        .await?;
165        Ok(res.rows_affected())
166    }
167
168    pub async fn is_follower(
169        &self,
170        actor_id: &str,
171        follower_id: &str,
172    ) -> Result<bool, sqlx::Error> {
173        let row: Option<(i64,)> = sqlx::query_as(
174            "SELECT 1 FROM followers WHERE actor_id = ?1 AND follower_id = ?2",
175        )
176        .bind(actor_id)
177        .bind(follower_id)
178        .fetch_optional(&self.pool)
179        .await?;
180        Ok(row.is_some())
181    }
182
183    pub async fn follower_inboxes(&self, actor_id: &str) -> Result<Vec<String>, sqlx::Error> {
184        let rows: Vec<(String,)> = sqlx::query_as(
185            "SELECT DISTINCT inbox FROM followers WHERE actor_id = ?1 AND inbox IS NOT NULL",
186        )
187        .bind(actor_id)
188        .fetch_all(&self.pool)
189        .await?;
190        Ok(rows.into_iter().map(|(s,)| s).collect())
191    }
192
193    /// Return every follower's inbox URL for the given actor.
194    ///
195    /// Alias for [`follower_inboxes`] — named to match the JSS
196    /// `getFollowerInboxes` helper added in v0.0.67.
197    pub async fn get_follower_inboxes(&self, actor_id: &str) -> Result<Vec<String>, sqlx::Error> {
198        self.follower_inboxes(actor_id).await
199    }
200
201    pub async fn follower_count(&self, actor_id: &str) -> Result<i64, sqlx::Error> {
202        let (n,): (i64,) =
203            sqlx::query_as("SELECT COUNT(*) FROM followers WHERE actor_id = ?1")
204                .bind(actor_id)
205                .fetch_one(&self.pool)
206                .await?;
207        Ok(n)
208    }
209
210    // -------------------------- following --------------------------------
211
212    pub async fn add_following(
213        &self,
214        actor_id: &str,
215        target_id: &str,
216    ) -> Result<(), sqlx::Error> {
217        let now = Utc::now();
218        sqlx::query(
219            "INSERT OR REPLACE INTO following (actor_id, target_id, requested_at, accepted) \
220             VALUES (?1, ?2, ?3, 0)",
221        )
222        .bind(actor_id)
223        .bind(target_id)
224        .bind(now)
225        .execute(&self.pool)
226        .await?;
227        Ok(())
228    }
229
230    pub async fn accept_following(
231        &self,
232        actor_id: &str,
233        target_id: &str,
234    ) -> Result<u64, sqlx::Error> {
235        let res = sqlx::query(
236            "UPDATE following SET accepted = 1 WHERE actor_id = ?1 AND target_id = ?2",
237        )
238        .bind(actor_id)
239        .bind(target_id)
240        .execute(&self.pool)
241        .await?;
242        Ok(res.rows_affected())
243    }
244
245    pub async fn is_following(
246        &self,
247        actor_id: &str,
248        target_id: &str,
249    ) -> Result<bool, sqlx::Error> {
250        let row: Option<(i64,)> = sqlx::query_as(
251            "SELECT accepted FROM following WHERE actor_id = ?1 AND target_id = ?2",
252        )
253        .bind(actor_id)
254        .bind(target_id)
255        .fetch_optional(&self.pool)
256        .await?;
257        Ok(matches!(row, Some((1,))))
258    }
259
260    // --------------------------- inbox -----------------------------------
261
262    /// Record an inbox activity. Idempotent on activity `id`.
263    pub async fn record_inbox(
264        &self,
265        actor_id: &str,
266        activity: &serde_json::Value,
267    ) -> Result<bool, sqlx::Error> {
268        let id = activity
269            .get("id")
270            .and_then(|v| v.as_str())
271            .unwrap_or("")
272            .to_string();
273        if id.is_empty() {
274            return Ok(false);
275        }
276        let body = serde_json::to_string(activity).unwrap_or_else(|_| "{}".into());
277        let res = sqlx::query(
278            "INSERT OR IGNORE INTO inbox (id, actor_id, activity, received_at) \
279             VALUES (?1, ?2, ?3, ?4)",
280        )
281        .bind(&id)
282        .bind(actor_id)
283        .bind(&body)
284        .bind(Utc::now())
285        .execute(&self.pool)
286        .await?;
287        Ok(res.rows_affected() > 0)
288    }
289
290    pub async fn get_inbox(&self, id: &str) -> Result<Option<InboxRow>, sqlx::Error> {
291        let row: Option<(String, String, String, DateTime<Utc>)> = sqlx::query_as(
292            "SELECT id, actor_id, activity, received_at FROM inbox WHERE id = ?1",
293        )
294        .bind(id)
295        .fetch_optional(&self.pool)
296        .await?;
297        Ok(row.map(|(id, actor_id, activity, received_at)| InboxRow {
298            id,
299            actor_id,
300            activity: serde_json::from_str(&activity).unwrap_or(serde_json::Value::Null),
301            received_at,
302        }))
303    }
304
305    pub async fn inbox_count(&self) -> Result<i64, sqlx::Error> {
306        let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM inbox")
307            .fetch_one(&self.pool)
308            .await?;
309        Ok(n)
310    }
311
312    // --------------------------- outbox ----------------------------------
313
314    pub async fn record_outbox(
315        &self,
316        actor_id: &str,
317        activity: &serde_json::Value,
318    ) -> Result<String, sqlx::Error> {
319        let id = activity
320            .get("id")
321            .and_then(|v| v.as_str())
322            .map(|s| s.to_string())
323            .unwrap_or_else(|| format!("urn:uuid:{}", uuid::Uuid::new_v4()));
324        let body = serde_json::to_string(activity).unwrap_or_else(|_| "{}".into());
325        sqlx::query(
326            "INSERT OR REPLACE INTO outbox (id, actor_id, activity, created_at, delivery_state) \
327             VALUES (?1, ?2, ?3, ?4, 'pending')",
328        )
329        .bind(&id)
330        .bind(actor_id)
331        .bind(&body)
332        .bind(Utc::now())
333        .execute(&self.pool)
334        .await?;
335        Ok(id)
336    }
337
338    pub async fn mark_outbox_state(
339        &self,
340        id: &str,
341        state: &str,
342    ) -> Result<u64, sqlx::Error> {
343        let res = sqlx::query("UPDATE outbox SET delivery_state = ?1 WHERE id = ?2")
344            .bind(state)
345            .bind(id)
346            .execute(&self.pool)
347            .await?;
348        Ok(res.rows_affected())
349    }
350
351    pub async fn outbox_count(&self) -> Result<i64, sqlx::Error> {
352        let (n,): (i64,) = sqlx::query_as("SELECT COUNT(*) FROM outbox")
353            .fetch_one(&self.pool)
354            .await?;
355        Ok(n)
356    }
357
358    // ----------------------- delivery queue ------------------------------
359
360    pub async fn enqueue_delivery(
361        &self,
362        activity_id: &str,
363        inbox_url: &str,
364    ) -> Result<i64, sqlx::Error> {
365        let res = sqlx::query(
366            "INSERT INTO delivery_queue (activity_id, inbox_url, attempts, next_retry) \
367             VALUES (?1, ?2, 0, ?3)",
368        )
369        .bind(activity_id)
370        .bind(inbox_url)
371        .bind(Utc::now())
372        .execute(&self.pool)
373        .await?;
374        Ok(res.last_insert_rowid())
375    }
376
377    pub async fn next_due_delivery(&self) -> Result<Option<DeliveryItem>, sqlx::Error> {
378        let row: Option<(i64, String, String, i64, Option<String>)> = sqlx::query_as(
379            "SELECT id, activity_id, inbox_url, attempts, last_error FROM delivery_queue \
380             WHERE next_retry <= ?1 ORDER BY id ASC LIMIT 1",
381        )
382        .bind(Utc::now())
383        .fetch_optional(&self.pool)
384        .await?;
385        Ok(row.map(
386            |(queue_id, activity_id, inbox_url, attempts, last_error)| DeliveryItem {
387                queue_id,
388                activity_id,
389                inbox_url,
390                attempts,
391                last_error,
392            },
393        ))
394    }
395
396    pub async fn drop_delivery(&self, queue_id: i64) -> Result<u64, sqlx::Error> {
397        let res = sqlx::query("DELETE FROM delivery_queue WHERE id = ?1")
398            .bind(queue_id)
399            .execute(&self.pool)
400            .await?;
401        Ok(res.rows_affected())
402    }
403
404    pub async fn reschedule_delivery(
405        &self,
406        queue_id: i64,
407        delay_secs: i64,
408        error: &str,
409    ) -> Result<u64, sqlx::Error> {
410        let next_retry =
411            Utc::now() + chrono::Duration::seconds(delay_secs.max(0));
412        let res = sqlx::query(
413            "UPDATE delivery_queue \
414             SET attempts = attempts + 1, next_retry = ?1, last_error = ?2 \
415             WHERE id = ?3",
416        )
417        .bind(next_retry)
418        .bind(error)
419        .bind(queue_id)
420        .execute(&self.pool)
421        .await?;
422        Ok(res.rows_affected())
423    }
424
425    // ----------------------- actor cache ----------------------------------
426
427    /// Cache a remote actor document. Uses `INSERT OR REPLACE` so
428    /// re-fetches update `fetched_at` to the current timestamp.
429    ///
430    /// The `fetched_at` column is always written as an ISO 8601 UTC
431    /// string via `chrono::Utc::now()` — this matches the JSS v0.0.66
432    /// fix that ensures consistent `datetime('now')`-compatible
433    /// timestamps in the actors table.
434    pub async fn cache_actor(
435        &self,
436        actor_id: &str,
437        data: &serde_json::Value,
438    ) -> Result<(), sqlx::Error> {
439        let body = serde_json::to_string(data).unwrap_or_else(|_| "{}".into());
440        let now = Utc::now();
441        sqlx::query(
442            "INSERT OR REPLACE INTO actors (id, data, fetched_at) VALUES (?1, ?2, ?3)",
443        )
444        .bind(actor_id)
445        .bind(&body)
446        .bind(now)
447        .execute(&self.pool)
448        .await?;
449        Ok(())
450    }
451
452    /// Retrieve a cached actor document. Returns `None` if not cached.
453    pub async fn get_cached_actor(
454        &self,
455        actor_id: &str,
456    ) -> Result<Option<(serde_json::Value, DateTime<Utc>)>, sqlx::Error> {
457        let row: Option<(String, DateTime<Utc>)> = sqlx::query_as(
458            "SELECT data, fetched_at FROM actors WHERE id = ?1",
459        )
460        .bind(actor_id)
461        .fetch_optional(&self.pool)
462        .await?;
463        Ok(row.map(|(data, fetched_at)| {
464            let parsed = serde_json::from_str(&data).unwrap_or(serde_json::Value::Null);
465            (parsed, fetched_at)
466        }))
467    }
468
469    /// Check whether a cached actor is still fresh (fetched within
470    /// `max_age`). Returns `true` if the cache entry exists and its
471    /// `fetched_at` timestamp is within the given duration of now.
472    ///
473    /// This uses `chrono::DateTime` comparison — all timestamps are
474    /// stored and compared as ISO 8601 UTC, avoiding the
475    /// `datetime('now')` vs bare-string mismatch that JSS v0.0.66
476    /// fixed.
477    pub async fn is_actor_cache_fresh(
478        &self,
479        actor_id: &str,
480        max_age: chrono::Duration,
481    ) -> Result<bool, sqlx::Error> {
482        match self.get_cached_actor(actor_id).await? {
483            Some((_data, fetched_at)) => {
484                let cutoff = Utc::now() - max_age;
485                Ok(fetched_at >= cutoff)
486            }
487            None => Ok(false),
488        }
489    }
490
491    pub async fn load_activity(
492        &self,
493        activity_id: &str,
494    ) -> Result<Option<serde_json::Value>, sqlx::Error> {
495        let row: Option<(String,)> =
496            sqlx::query_as("SELECT activity FROM outbox WHERE id = ?1")
497                .bind(activity_id)
498                .fetch_optional(&self.pool)
499                .await?;
500        Ok(row.and_then(|(s,)| serde_json::from_str(&s).ok()))
501    }
502}
503
504// ---------------------------------------------------------------------------
505// Tests
506// ---------------------------------------------------------------------------
507
508#[cfg(test)]
509mod tests {
510    use super::*;
511
512    async fn fresh() -> Store {
513        Store::in_memory().await.unwrap()
514    }
515
516    #[tokio::test]
517    async fn followers_roundtrip() {
518        let s = fresh().await;
519        s.add_follower("me", "them", Some("https://them/inbox"))
520            .await
521            .unwrap();
522        assert!(s.is_follower("me", "them").await.unwrap());
523        assert_eq!(s.follower_count("me").await.unwrap(), 1);
524        let inboxes = s.follower_inboxes("me").await.unwrap();
525        assert_eq!(inboxes, vec!["https://them/inbox".to_string()]);
526        s.remove_follower("me", "them").await.unwrap();
527        assert!(!s.is_follower("me", "them").await.unwrap());
528    }
529
530    #[tokio::test]
531    async fn following_lifecycle() {
532        let s = fresh().await;
533        s.add_following("me", "https://other/actor").await.unwrap();
534        assert!(!s.is_following("me", "https://other/actor").await.unwrap());
535        s.accept_following("me", "https://other/actor")
536            .await
537            .unwrap();
538        assert!(s.is_following("me", "https://other/actor").await.unwrap());
539    }
540
541    #[tokio::test]
542    async fn inbox_insert_is_idempotent_by_id() {
543        let s = fresh().await;
544        let act = serde_json::json!({"id": "https://a/1", "type": "Create"});
545        assert!(s.record_inbox("me", &act).await.unwrap());
546        assert!(!s.record_inbox("me", &act).await.unwrap());
547        assert_eq!(s.inbox_count().await.unwrap(), 1);
548    }
549
550    #[tokio::test]
551    async fn outbox_records_and_updates_state() {
552        let s = fresh().await;
553        let act = serde_json::json!({"id": "https://me/out/1", "type": "Create"});
554        let id = s.record_outbox("me", &act).await.unwrap();
555        assert_eq!(id, "https://me/out/1");
556        assert_eq!(s.outbox_count().await.unwrap(), 1);
557        let updated = s.mark_outbox_state(&id, "delivered").await.unwrap();
558        assert_eq!(updated, 1);
559    }
560
561    #[tokio::test]
562    async fn delivery_queue_roundtrip() {
563        let s = fresh().await;
564        let qid = s
565            .enqueue_delivery("https://me/out/1", "https://them/inbox")
566            .await
567            .unwrap();
568        let item = s.next_due_delivery().await.unwrap().unwrap();
569        assert_eq!(item.queue_id, qid);
570        assert_eq!(item.inbox_url, "https://them/inbox");
571        s.reschedule_delivery(qid, 0, "transient").await.unwrap();
572        let again = s.next_due_delivery().await.unwrap().unwrap();
573        assert_eq!(again.attempts, 1);
574        assert_eq!(again.last_error.as_deref(), Some("transient"));
575        s.drop_delivery(qid).await.unwrap();
576        assert!(s.next_due_delivery().await.unwrap().is_none());
577    }
578
579    #[tokio::test]
580    async fn actor_cache_roundtrip() {
581        let s = fresh().await;
582        let actor_id = "https://remote.example/actor";
583        let data = serde_json::json!({"type": "Person", "name": "Remote"});
584
585        // Nothing cached initially.
586        assert!(s.get_cached_actor(actor_id).await.unwrap().is_none());
587
588        // Cache and retrieve.
589        s.cache_actor(actor_id, &data).await.unwrap();
590        let (cached, fetched_at) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
591        assert_eq!(cached["name"], "Remote");
592        // fetched_at should be very recent (within 5 seconds).
593        let age = Utc::now() - fetched_at;
594        assert!(age.num_seconds() < 5, "fetched_at too old: {age}");
595    }
596
597    #[tokio::test]
598    async fn actor_cache_freshness_check() {
599        let s = fresh().await;
600        let actor_id = "https://remote.example/actor2";
601        let data = serde_json::json!({"type": "Person"});
602        s.cache_actor(actor_id, &data).await.unwrap();
603
604        // Should be fresh within a 1-hour window.
605        assert!(s
606            .is_actor_cache_fresh(actor_id, chrono::Duration::hours(1))
607            .await
608            .unwrap());
609
610        // Should NOT be fresh within a 0-second window (it was cached
611        // at least a microsecond ago).
612        // Note: chrono::Duration::zero() would make it always fresh
613        // since fetched_at >= cutoff. Use a negative duration trick
614        // is not valid, so we rely on the insertion delay.
615        // Instead, just verify uncached actors are not fresh.
616        assert!(!s
617            .is_actor_cache_fresh("https://never-cached.example/actor", chrono::Duration::hours(1))
618            .await
619            .unwrap());
620    }
621
622    #[tokio::test]
623    async fn actor_cache_upsert_updates_fetched_at() {
624        let s = fresh().await;
625        let actor_id = "https://remote.example/actor3";
626        let data1 = serde_json::json!({"name": "v1"});
627        s.cache_actor(actor_id, &data1).await.unwrap();
628        let (_, ts1) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
629
630        // Re-cache with new data.
631        let data2 = serde_json::json!({"name": "v2"});
632        s.cache_actor(actor_id, &data2).await.unwrap();
633        let (cached, ts2) = s.get_cached_actor(actor_id).await.unwrap().unwrap();
634        assert_eq!(cached["name"], "v2");
635        assert!(ts2 >= ts1, "fetched_at should not go backwards");
636    }
637
638    #[tokio::test]
639    async fn actor_cache_datetime_is_iso8601() {
640        let s = fresh().await;
641        let actor_id = "https://remote.example/actor-dt";
642        let data = serde_json::json!({"type": "Person"});
643        s.cache_actor(actor_id, &data).await.unwrap();
644
645        // Read the raw fetched_at string from SQLite to confirm format.
646        let (raw,): (String,) = sqlx::query_as(
647            "SELECT fetched_at FROM actors WHERE id = ?1",
648        )
649        .bind(actor_id)
650        .fetch_one(s.pool())
651        .await
652        .unwrap();
653        // chrono serialises to ISO 8601: "2026-05-06T12:34:56.123456789Z"
654        // or "2026-05-06 12:34:56 UTC". Either way it must parse back.
655        let parsed = chrono::DateTime::parse_from_rfc3339(&raw)
656            .or_else(|_| {
657                // sqlx may store in "YYYY-MM-DD HH:MM:SS" form.
658                chrono::NaiveDateTime::parse_from_str(&raw, "%Y-%m-%d %H:%M:%S%.f")
659                    .map(|ndt| ndt.and_utc().fixed_offset())
660                    .or_else(|_| {
661                        chrono::NaiveDateTime::parse_from_str(&raw, "%Y-%m-%dT%H:%M:%S%.f")
662                            .map(|ndt| ndt.and_utc().fixed_offset())
663                    })
664            });
665        assert!(parsed.is_ok(), "fetched_at is not a valid datetime: {raw}");
666    }
667
668    #[tokio::test]
669    async fn get_follower_inboxes_alias() {
670        let s = fresh().await;
671        s.add_follower("actor-a", "f1", Some("https://f1/inbox"))
672            .await
673            .unwrap();
674        s.add_follower("actor-a", "f2", Some("https://f2/inbox"))
675            .await
676            .unwrap();
677        s.add_follower("actor-a", "f3", None).await.unwrap(); // no inbox
678        let inboxes = s.get_follower_inboxes("actor-a").await.unwrap();
679        assert_eq!(inboxes.len(), 2);
680        assert!(inboxes.contains(&"https://f1/inbox".to_string()));
681        assert!(inboxes.contains(&"https://f2/inbox".to_string()));
682    }
683
684    #[tokio::test]
685    async fn follower_inbox_fanout_enqueues_all() {
686        let s = fresh().await;
687        let actor_id = "me";
688        s.add_follower(actor_id, "a", Some("https://a/inbox")).await.unwrap();
689        s.add_follower(actor_id, "b", Some("https://b/inbox")).await.unwrap();
690        s.add_follower(actor_id, "c", Some("https://c/inbox")).await.unwrap();
691
692        let inboxes = s.get_follower_inboxes(actor_id).await.unwrap();
693        let activity_id = "https://me/out/fanout-1";
694        for inbox in &inboxes {
695            s.enqueue_delivery(activity_id, inbox).await.unwrap();
696        }
697
698        // Count delivery-queue rows for this activity.
699        let (n,): (i64,) = sqlx::query_as(
700            "SELECT COUNT(*) FROM delivery_queue WHERE activity_id = ?1",
701        )
702        .bind(activity_id)
703        .fetch_one(s.pool())
704        .await
705        .unwrap();
706        assert_eq!(n, 3);
707    }
708}