Skip to main content

shuttle_rs/
store.rs

1use std::path::Path;
2use std::sync::{Arc, Mutex};
3
4use crate::core::{Event, EventFilter, EventStore, EventType, Result, ShuttleError};
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use rusqlite::types::Value;
8use rusqlite::{params, params_from_iter, Connection, OptionalExtension};
9use uuid::Uuid;
10
11#[derive(Clone)]
12pub struct SqliteEventStore {
13    conn: Arc<Mutex<Connection>>,
14}
15
16impl SqliteEventStore {
17    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
18        let conn = Connection::open(path).map_err(to_store_error)?;
19        let store = Self {
20            conn: Arc::new(Mutex::new(conn)),
21        };
22        store.init()?;
23        Ok(store)
24    }
25
26    pub fn init(&self) -> Result<()> {
27        let conn = self
28            .conn
29            .lock()
30            .map_err(|err| ShuttleError::Store(err.to_string()))?;
31        conn.execute_batch(
32            r#"
33            CREATE TABLE IF NOT EXISTS events (
34                id TEXT PRIMARY KEY NOT NULL,
35                event_type TEXT NOT NULL,
36                workspace_id TEXT NOT NULL,
37                repo_id TEXT,
38                repo_path TEXT,
39                git_remote TEXT,
40                bit_repo_id TEXT,
41                branch TEXT,
42                commit_hash TEXT,
43                repo_dirty INTEGER,
44                agent TEXT NOT NULL,
45                session_id TEXT NOT NULL,
46                title TEXT,
47                content TEXT NOT NULL,
48                tags TEXT NOT NULL,
49                metadata_json TEXT NOT NULL DEFAULT '{}',
50                created_at TEXT NOT NULL
51            );
52
53            CREATE TABLE IF NOT EXISTS event_tags (
54                event_id TEXT NOT NULL,
55                tag TEXT NOT NULL,
56                PRIMARY KEY (event_id, tag),
57                FOREIGN KEY (event_id) REFERENCES events(id)
58            );
59
60            CREATE INDEX IF NOT EXISTS idx_events_type_created ON events(event_type, created_at);
61            CREATE INDEX IF NOT EXISTS idx_events_workspace_created ON events(workspace_id, created_at);
62            CREATE INDEX IF NOT EXISTS idx_events_agent_created ON events(agent, created_at);
63            CREATE INDEX IF NOT EXISTS idx_event_tags_tag ON event_tags(tag);
64            "#,
65        )
66        .map_err(to_store_error)?;
67        ensure_column(&conn, "repo_path", "TEXT")?;
68        ensure_column(&conn, "git_remote", "TEXT")?;
69        ensure_column(&conn, "bit_repo_id", "TEXT")?;
70        ensure_column(&conn, "repo_dirty", "INTEGER")?;
71        ensure_column(&conn, "metadata_json", "TEXT NOT NULL DEFAULT '{}'")?;
72        backfill_event_tags(&conn)?;
73        Ok(())
74    }
75
76    pub fn append_if_absent(&self, event: Event) -> Result<bool> {
77        let mut conn = self
78            .conn
79            .lock()
80            .map_err(|err| ShuttleError::Store(err.to_string()))?;
81        insert_event(&mut conn, event, InsertMode::IgnoreDuplicates)
82    }
83}
84
85#[async_trait]
86impl EventStore for SqliteEventStore {
87    async fn append(&self, event: Event) -> Result<Event> {
88        let mut conn = self
89            .conn
90            .lock()
91            .map_err(|err| ShuttleError::Store(err.to_string()))?;
92        insert_event(&mut conn, event.clone(), InsertMode::Strict)?;
93        Ok(event)
94    }
95
96    async fn list(&self, filter: EventFilter) -> Result<Vec<Event>> {
97        let conn = self
98            .conn
99            .lock()
100            .map_err(|err| ShuttleError::Store(err.to_string()))?;
101        let limit = filter.limit.unwrap_or(50);
102        let event_type = filter
103            .event_type
104            .map(|event_type| Value::Text(event_type.as_str().to_owned()));
105        let query = filter
106            .query
107            .as_ref()
108            .map(|query| Value::Text(format!("%{}%", query.to_lowercase())));
109        let tags = filter_tags(&filter);
110        let mut sql = r#"
111                SELECT id, event_type, workspace_id, repo_id, repo_path, git_remote, bit_repo_id, branch, commit_hash, repo_dirty,
112                       agent, session_id, title, content, tags, metadata_json, created_at
113                FROM events
114                WHERE (?1 IS NULL OR event_type = ?1)
115                  AND (?2 IS NULL OR workspace_id = ?2)
116                  AND (?3 IS NULL OR agent = ?3)
117                  AND (?6 IS NULL OR id = ?6)
118                  AND (
119                    ?4 IS NULL
120                    OR json_extract(metadata_json, '$.to') = ?4
121                    OR EXISTS (
122                      SELECT 1 FROM event_tags
123                      WHERE event_tags.event_id = events.id AND event_tags.tag = ('to:' || ?4)
124                    )
125                  )
126                  AND (
127                    ?5 IS NULL
128                    OR lower(coalesce(title, '')) LIKE ?5
129                    OR lower(content) LIKE ?5
130                    OR lower(tags) LIKE ?5
131                    OR lower(metadata_json) LIKE ?5
132                  )
133        "#
134        .to_owned();
135        let mut values = vec![
136            event_type.unwrap_or(Value::Null),
137            filter.workspace_id.map(Value::Text).unwrap_or(Value::Null),
138            filter.agent.map(Value::Text).unwrap_or(Value::Null),
139            filter.recipient.map(Value::Text).unwrap_or(Value::Null),
140            query.unwrap_or(Value::Null),
141            filter
142                .id
143                .map(|id| Value::Text(id.to_string()))
144                .unwrap_or(Value::Null),
145        ];
146        for tag in tags {
147            let index = values.len() + 1;
148            sql.push_str(&format!(
149                " AND EXISTS (SELECT 1 FROM event_tags WHERE event_tags.event_id = events.id AND event_tags.tag = ?{index})"
150            ));
151            values.push(Value::Text(tag));
152        }
153        let limit_index = values.len() + 1;
154        sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ?{limit_index}"));
155        values.push(Value::Integer(i64::from(limit)));
156
157        let mut stmt = conn.prepare(&sql).map_err(to_store_error)?;
158
159        let rows = stmt
160            .query_map(params_from_iter(values.iter()), row_to_event)
161            .map_err(to_store_error)?;
162
163        let mut events = Vec::new();
164        for row in rows {
165            let event = row.map_err(to_store_error)?;
166            events.push(event);
167        }
168
169        Ok(events)
170    }
171}
172
173#[derive(Debug, Clone, Copy)]
174enum InsertMode {
175    Strict,
176    IgnoreDuplicates,
177}
178
179fn insert_event(conn: &mut Connection, event: Event, mode: InsertMode) -> Result<bool> {
180    let tags = serde_json::to_string(&event.tags)
181        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
182    let metadata_json = serde_json::to_string(&event.metadata_json)
183        .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
184    let insert = match mode {
185        InsertMode::Strict => "INSERT INTO events",
186        InsertMode::IgnoreDuplicates => "INSERT OR IGNORE INTO events",
187    };
188
189    let tx = conn.transaction().map_err(to_store_error)?;
190    let inserted = tx
191        .execute(
192            &format!(
193                r#"
194            {insert} (
195                id, event_type, workspace_id, repo_id, repo_path, git_remote, bit_repo_id, branch, commit_hash, repo_dirty,
196                agent, session_id, title, content, tags, metadata_json, created_at
197            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)
198            "#
199            ),
200            params![
201                event.id.to_string(),
202                event.event_type.as_str(),
203                &event.workspace_id,
204                &event.repo_id,
205                &event.repo_path,
206                &event.git_remote,
207                &event.bit_repo_id,
208                &event.branch,
209                &event.commit,
210                event.repo_dirty,
211                &event.agent,
212                &event.session_id,
213                &event.title,
214                &event.content,
215                tags,
216                metadata_json,
217                event.created_at.to_rfc3339(),
218            ],
219        )
220        .map_err(to_store_error)?
221        > 0;
222
223    if inserted {
224        for tag in &event.tags {
225            tx.execute(
226                "INSERT OR IGNORE INTO event_tags (event_id, tag) VALUES (?1, ?2)",
227                params![event.id.to_string(), tag],
228            )
229            .map_err(to_store_error)?;
230        }
231    }
232    tx.commit().map_err(to_store_error)?;
233
234    Ok(inserted)
235}
236
237fn row_to_event(row: &rusqlite::Row<'_>) -> rusqlite::Result<Event> {
238    let event_type: String = row.get(1)?;
239    let tags: String = row.get(14)?;
240    let metadata_json: String = row.get(15)?;
241    let created_at: String = row.get(16)?;
242
243    let event_type = EventType::try_from(event_type.as_str()).map_err(to_sql_error)?;
244    let tags = serde_json::from_str(&tags).map_err(to_sql_error)?;
245    let metadata_json = serde_json::from_str(&metadata_json).map_err(to_sql_error)?;
246    let created_at = DateTime::parse_from_rfc3339(&created_at)
247        .map_err(to_sql_error)?
248        .with_timezone(&Utc);
249
250    Ok(Event {
251        id: Uuid::parse_str(&row.get::<_, String>(0)?).map_err(to_sql_error)?,
252        event_type,
253        workspace_id: row.get(2)?,
254        repo_id: row.get(3)?,
255        repo_path: row.get(4)?,
256        git_remote: row.get(5)?,
257        bit_repo_id: row.get(6)?,
258        branch: row.get(7)?,
259        commit: row.get(8)?,
260        repo_dirty: row.get(9)?,
261        agent: row.get(10)?,
262        session_id: row.get(11)?,
263        title: row.get(12)?,
264        content: row.get(13)?,
265        tags,
266        metadata_json,
267        created_at,
268    })
269}
270
271fn filter_tags(filter: &EventFilter) -> Vec<String> {
272    let mut tags = filter.tags.clone();
273    if let Some(tag) = &filter.tag {
274        tags.push(tag.clone());
275    }
276    tags.sort();
277    tags.dedup();
278    tags
279}
280
281fn ensure_column(conn: &Connection, column: &str, column_type: &str) -> Result<()> {
282    let exists = conn
283        .prepare("PRAGMA table_info(events)")
284        .map_err(to_store_error)?
285        .query_map([], |row| row.get::<_, String>(1))
286        .map_err(to_store_error)?
287        .collect::<std::result::Result<Vec<_>, _>>()
288        .map_err(to_store_error)?
289        .iter()
290        .any(|name| name == column);
291
292    if !exists {
293        conn.execute(
294            &format!("ALTER TABLE events ADD COLUMN {column} {column_type}"),
295            [],
296        )
297        .map_err(to_store_error)?;
298    }
299    Ok(())
300}
301
302fn backfill_event_tags(conn: &Connection) -> Result<()> {
303    let mut stmt = conn
304        .prepare("SELECT id, tags FROM events")
305        .map_err(to_store_error)?;
306    let rows = stmt
307        .query_map([], |row| {
308            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
309        })
310        .map_err(to_store_error)?;
311
312    for row in rows {
313        let (event_id, tags_json) = row.map_err(to_store_error)?;
314        let tags: Vec<String> = serde_json::from_str(&tags_json)
315            .map_err(|err| ShuttleError::Serialization(err.to_string()))?;
316        for tag in tags {
317            conn.execute(
318                "INSERT OR IGNORE INTO event_tags (event_id, tag) VALUES (?1, ?2)",
319                params![event_id, tag],
320            )
321            .map_err(to_store_error)?;
322        }
323    }
324
325    Ok(())
326}
327
328fn to_store_error(err: rusqlite::Error) -> ShuttleError {
329    ShuttleError::Store(err.to_string())
330}
331
332fn to_sql_error<E>(err: E) -> rusqlite::Error
333where
334    E: std::error::Error + Send + Sync + 'static,
335{
336    rusqlite::Error::ToSqlConversionFailure(Box::new(err))
337}
338
339pub fn database_exists(path: impl AsRef<Path>) -> Result<bool> {
340    let conn = Connection::open(path).map_err(to_store_error)?;
341    let exists = conn
342        .query_row(
343            "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'events'",
344            [],
345            |row| row.get::<_, String>(0),
346        )
347        .optional()
348        .map_err(to_store_error)?
349        .is_some();
350    Ok(exists)
351}
352
353#[cfg(test)]
354mod tests {
355    use crate::core::{Event, NewEvent};
356    use serde_json::json;
357
358    use super::*;
359
360    #[test]
361    fn stores_and_filters_events() {
362        let dir = tempfile::tempdir().unwrap();
363        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
364        let event = Event::new(NewEvent {
365            event_type: EventType::Memory,
366            workspace_id: "workspace".into(),
367            repo_id: None,
368            repo_path: None,
369            git_remote: None,
370            bit_repo_id: None,
371            branch: None,
372            commit: None,
373            repo_dirty: None,
374            agent: "codex".into(),
375            session_id: "session".into(),
376            title: None,
377            content: "SQLite chosen for local-first storage".into(),
378            tags: vec!["storage".into()],
379            metadata_json: json!({}),
380        });
381
382        futures_executor::block_on(store.append(event)).unwrap();
383        let events = futures_executor::block_on(store.list(EventFilter {
384            event_type: Some(EventType::Memory),
385            query: Some("sqlite".into()),
386            ..EventFilter::default()
387        }))
388        .unwrap();
389
390        assert_eq!(events.len(), 1);
391        assert_eq!(events[0].content, "SQLite chosen for local-first storage");
392    }
393
394    #[test]
395    fn stores_metadata_and_filters_normalized_tags() {
396        let dir = tempfile::tempdir().unwrap();
397        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
398        let event = Event::new(NewEvent {
399            event_type: EventType::Message,
400            workspace_id: "workspace".into(),
401            repo_id: None,
402            repo_path: None,
403            git_remote: None,
404            bit_repo_id: None,
405            branch: None,
406            commit: None,
407            repo_dirty: None,
408            agent: "codex".into(),
409            session_id: "session".into(),
410            title: None,
411            content: "hello".into(),
412            tags: vec!["important".into()],
413            metadata_json: json!({ "to": "claude" }),
414        });
415
416        futures_executor::block_on(store.append(event)).unwrap();
417        let tag_events = futures_executor::block_on(store.list(EventFilter {
418            tag: Some("important".into()),
419            ..EventFilter::default()
420        }))
421        .unwrap();
422        let recipient_events = futures_executor::block_on(store.list(EventFilter {
423            recipient: Some("claude".into()),
424            ..EventFilter::default()
425        }))
426        .unwrap();
427
428        assert_eq!(tag_events.len(), 1);
429        assert_eq!(recipient_events.len(), 1);
430        assert_eq!(recipient_events[0].metadata_json["to"], "claude");
431    }
432
433    #[test]
434    fn filters_by_all_requested_tags() {
435        let dir = tempfile::tempdir().unwrap();
436        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
437        let first = Event::new(NewEvent {
438            event_type: EventType::Memory,
439            workspace_id: "workspace".into(),
440            repo_id: None,
441            repo_path: None,
442            git_remote: None,
443            bit_repo_id: None,
444            branch: None,
445            commit: None,
446            repo_dirty: None,
447            agent: "codex".into(),
448            session_id: "session".into(),
449            title: None,
450            content: "storage memory".into(),
451            tags: vec!["storage".into(), "mvp".into()],
452            metadata_json: json!({}),
453        });
454        let second = Event::new(NewEvent {
455            event_type: EventType::Memory,
456            workspace_id: "workspace".into(),
457            repo_id: None,
458            repo_path: None,
459            git_remote: None,
460            bit_repo_id: None,
461            branch: None,
462            commit: None,
463            repo_dirty: None,
464            agent: "codex".into(),
465            session_id: "session".into(),
466            title: None,
467            content: "storage only".into(),
468            tags: vec!["storage".into()],
469            metadata_json: json!({}),
470        });
471
472        futures_executor::block_on(store.append(first)).unwrap();
473        futures_executor::block_on(store.append(second)).unwrap();
474        let events = futures_executor::block_on(store.list(EventFilter {
475            tags: vec!["storage".into(), "mvp".into()],
476            ..EventFilter::default()
477        }))
478        .unwrap();
479
480        assert_eq!(events.len(), 1);
481        assert_eq!(events[0].content, "storage memory");
482    }
483
484    #[test]
485    fn applies_query_filter_before_limit() {
486        let dir = tempfile::tempdir().unwrap();
487        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
488        let old_match = Event::new(NewEvent {
489            event_type: EventType::Memory,
490            workspace_id: "workspace".into(),
491            repo_id: None,
492            repo_path: None,
493            git_remote: None,
494            bit_repo_id: None,
495            branch: None,
496            commit: None,
497            repo_dirty: None,
498            agent: "codex".into(),
499            session_id: "session".into(),
500            title: None,
501            content: "needle memory".into(),
502            tags: Vec::new(),
503            metadata_json: json!({}),
504        });
505        futures_executor::block_on(store.append(old_match)).unwrap();
506        for index in 0..75 {
507            let event = Event::new(NewEvent {
508                event_type: EventType::Memory,
509                workspace_id: "workspace".into(),
510                repo_id: None,
511                repo_path: None,
512                git_remote: None,
513                bit_repo_id: None,
514                branch: None,
515                commit: None,
516                repo_dirty: None,
517                agent: "codex".into(),
518                session_id: "session".into(),
519                title: None,
520                content: format!("recent memory {index}"),
521                tags: Vec::new(),
522                metadata_json: json!({}),
523            });
524            futures_executor::block_on(store.append(event)).unwrap();
525        }
526
527        let events = futures_executor::block_on(store.list(EventFilter {
528            event_type: Some(EventType::Memory),
529            query: Some("needle".into()),
530            limit: Some(1),
531            ..EventFilter::default()
532        }))
533        .unwrap();
534
535        assert_eq!(events.len(), 1);
536        assert_eq!(events[0].content, "needle memory");
537    }
538
539    #[test]
540    fn repo_dirty_round_trips() {
541        let dir = tempfile::tempdir().unwrap();
542        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
543        let event = Event::new(NewEvent {
544            event_type: EventType::Decision,
545            workspace_id: "workspace".into(),
546            repo_id: Some("repo".into()),
547            repo_path: Some("/repo".into()),
548            git_remote: None,
549            bit_repo_id: None,
550            branch: Some("main".into()),
551            commit: Some("abc".into()),
552            repo_dirty: Some(true),
553            agent: "codex".into(),
554            session_id: "session".into(),
555            title: None,
556            content: "dirty decision".into(),
557            tags: Vec::new(),
558            metadata_json: json!({}),
559        });
560
561        futures_executor::block_on(store.append(event)).unwrap();
562        let events = futures_executor::block_on(store.list(EventFilter {
563            event_type: Some(EventType::Decision),
564            ..EventFilter::default()
565        }))
566        .unwrap();
567
568        assert_eq!(events[0].repo_dirty, Some(true));
569    }
570
571    #[test]
572    fn append_if_absent_skips_duplicate_ids() {
573        let dir = tempfile::tempdir().unwrap();
574        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
575        let event = Event::new(NewEvent {
576            event_type: EventType::Memory,
577            workspace_id: "workspace".into(),
578            repo_id: None,
579            repo_path: None,
580            git_remote: None,
581            bit_repo_id: None,
582            branch: None,
583            commit: None,
584            repo_dirty: None,
585            agent: "codex".into(),
586            session_id: "session".into(),
587            title: None,
588            content: "replicated memory".into(),
589            tags: vec!["sync".into()],
590            metadata_json: json!({}),
591        });
592
593        assert!(store.append_if_absent(event.clone()).unwrap());
594        assert!(!store.append_if_absent(event).unwrap());
595        let events = futures_executor::block_on(store.list(EventFilter {
596            event_type: Some(EventType::Memory),
597            ..EventFilter::default()
598        }))
599        .unwrap();
600
601        assert_eq!(events.len(), 1);
602        assert_eq!(events[0].tags, vec!["sync"]);
603    }
604
605    #[test]
606    fn backfills_legacy_tags_and_reads_legacy_recipients() {
607        let dir = tempfile::tempdir().unwrap();
608        let path = dir.path().join("shuttle.db");
609        let conn = Connection::open(&path).unwrap();
610        conn.execute_batch(
611            r#"
612            CREATE TABLE events (
613                id TEXT PRIMARY KEY NOT NULL,
614                event_type TEXT NOT NULL,
615                workspace_id TEXT NOT NULL,
616                repo_id TEXT,
617                repo_path TEXT,
618                git_remote TEXT,
619                bit_repo_id TEXT,
620                branch TEXT,
621                commit_hash TEXT,
622                agent TEXT NOT NULL,
623                session_id TEXT NOT NULL,
624                title TEXT,
625                content TEXT NOT NULL,
626                tags TEXT NOT NULL,
627                created_at TEXT NOT NULL
628            );
629            "#,
630        )
631        .unwrap();
632        conn.execute(
633            r#"
634            INSERT INTO events (
635                id, event_type, workspace_id, repo_id, repo_path, git_remote, bit_repo_id, branch, commit_hash,
636                agent, session_id, title, content, tags, created_at
637            ) VALUES (?1, 'message', 'workspace', NULL, NULL, NULL, NULL, NULL, NULL, 'codex', 'session', NULL, 'legacy', ?2, ?3)
638            "#,
639            params![
640                Uuid::new_v4().to_string(),
641                serde_json::to_string(&vec!["to:claude".to_owned(), "legacy".to_owned()]).unwrap(),
642                Utc::now().to_rfc3339(),
643            ],
644        )
645        .unwrap();
646        drop(conn);
647
648        let store = SqliteEventStore::open(&path).unwrap();
649        let events = futures_executor::block_on(store.list(EventFilter {
650            recipient: Some("claude".into()),
651            ..EventFilter::default()
652        }))
653        .unwrap();
654        let tag_events = futures_executor::block_on(store.list(EventFilter {
655            tag: Some("legacy".into()),
656            ..EventFilter::default()
657        }))
658        .unwrap();
659
660        assert_eq!(events.len(), 1);
661        assert_eq!(events[0].content, "legacy");
662        assert_eq!(tag_events.len(), 1);
663    }
664
665    #[test]
666    fn filters_by_event_id() {
667        let dir = tempfile::tempdir().unwrap();
668        let store = SqliteEventStore::open(dir.path().join("shuttle.db")).unwrap();
669        let first = Event::new(NewEvent {
670            event_type: EventType::Message,
671            workspace_id: "workspace".into(),
672            repo_id: None,
673            repo_path: None,
674            git_remote: None,
675            bit_repo_id: None,
676            branch: None,
677            commit: None,
678            repo_dirty: None,
679            agent: "codex".into(),
680            session_id: "session".into(),
681            title: None,
682            content: "first".into(),
683            tags: Vec::new(),
684            metadata_json: json!({ "to": "claude" }),
685        });
686        let second = Event::new(NewEvent {
687            event_type: EventType::Message,
688            workspace_id: "workspace".into(),
689            repo_id: None,
690            repo_path: None,
691            git_remote: None,
692            bit_repo_id: None,
693            branch: None,
694            commit: None,
695            repo_dirty: None,
696            agent: "codex".into(),
697            session_id: "session".into(),
698            title: None,
699            content: "second".into(),
700            tags: Vec::new(),
701            metadata_json: json!({ "to": "claude" }),
702        });
703        let wanted = second.id;
704
705        futures_executor::block_on(store.append(first)).unwrap();
706        futures_executor::block_on(store.append(second)).unwrap();
707        let events = futures_executor::block_on(store.list(EventFilter {
708            id: Some(wanted),
709            event_type: Some(EventType::Message),
710            ..EventFilter::default()
711        }))
712        .unwrap();
713
714        assert_eq!(events.len(), 1);
715        assert_eq!(events[0].id, wanted);
716        assert_eq!(events[0].content, "second");
717    }
718}