use std::collections::HashMap;
use std::collections::VecDeque;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use crate::session::{self, SessionInfo, SessionMessage, SqliteMessageTail};
pub struct SessionRow {
pub info: SessionInfo,
pub alive: bool,
pub languages: Vec<String>,
}
pub trait DataSource {
fn list_sessions(&self) -> Result<Vec<SessionRow>>;
fn monitor_messages(&self, session_id: &str) -> Result<Vec<SessionMessage>>;
fn create_message_tail(&self, session_id: &str) -> Result<Box<dyn MessageTail>>;
fn delete_session(&self, session_id: &str) -> Result<()>;
fn list_alive_session_ids(&self) -> Result<Vec<String>>;
}
pub trait MessageTail: Send {
fn try_next_message(&mut self) -> Result<Option<SessionMessage>>;
}
impl MessageTail for SqliteMessageTail {
fn try_next_message(&mut self) -> Result<Option<SessionMessage>> {
self.try_next_message()
}
}
pub struct SqliteDataSource {
conn: rusqlite::Connection,
}
impl SqliteDataSource {
pub fn new() -> Result<Self> {
let path = crate::db::db_path();
let conn = rusqlite::Connection::open_with_flags(
&path,
rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
| rusqlite::OpenFlags::SQLITE_OPEN_URI
| rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.with_context(|| {
format!(
"No database found at {}. Is a Catenary session running?",
path.display()
)
})?;
Ok(Self { conn })
}
#[must_use]
pub const fn with_conn(conn: rusqlite::Connection) -> Self {
Self { conn }
}
}
struct RawSessionRow {
id: String,
pid: u32,
display_name: String,
client_name: Option<String>,
client_version: Option<String>,
client_session_id: Option<String>,
started_at_str: String,
db_alive: bool,
}
impl DataSource for SqliteDataSource {
fn list_sessions(&self) -> Result<Vec<SessionRow>> {
let raw = {
let mut stmt = self.conn.prepare(
"SELECT id, pid, display_name, client_name, client_version, \
client_session_id, started_at, alive \
FROM sessions ORDER BY alive DESC, started_at DESC",
)?;
let mut r = stmt.query([])?;
let mut rows = Vec::new();
while let Some(row) = r.next()? {
rows.push(RawSessionRow {
id: row.get(0)?,
pid: row.get(1)?,
display_name: row.get(2)?,
client_name: row.get(3)?,
client_version: row.get(4)?,
client_session_id: row.get(5)?,
started_at_str: row.get(6)?,
db_alive: row.get(7)?,
});
}
rows
};
let mut sessions = Vec::with_capacity(raw.len());
for RawSessionRow {
id,
pid,
display_name,
client_name,
client_version,
client_session_id,
started_at_str,
db_alive,
} in raw
{
let started_at = DateTime::parse_from_rfc3339(&started_at_str)
.with_context(|| format!("invalid started_at: {started_at_str}"))?
.with_timezone(&Utc);
let alive = if db_alive {
if session::is_process_alive(pid) {
true
} else {
let _ = self.conn.execute(
"UPDATE sessions SET alive = 0, ended_at = ?1 WHERE id = ?2",
rusqlite::params![Utc::now().to_rfc3339(), &id],
);
false
}
} else {
false
};
let languages = active_languages_for(&self.conn, &id);
sessions.push(SessionRow {
info: SessionInfo {
id,
pid,
workspace: display_name,
started_at,
client_name,
client_version,
client_session_id,
},
alive,
languages,
});
}
Ok(sessions)
}
fn monitor_messages(&self, session_id: &str) -> Result<Vec<SessionMessage>> {
session::monitor_messages_with_conn(&self.conn, session_id)
}
fn create_message_tail(&self, session_id: &str) -> Result<Box<dyn MessageTail>> {
let tail = session::tail_messages_new(session_id)?;
Ok(Box::new(tail))
}
fn delete_session(&self, session_id: &str) -> Result<()> {
self.conn
.execute("DELETE FROM sessions WHERE id = ?1", [session_id])?;
let socket_dir = session::sessions_dir().join(session_id);
let _ = std::fs::remove_dir_all(&socket_dir);
Ok(())
}
fn list_alive_session_ids(&self) -> Result<Vec<String>> {
let mut stmt = self
.conn
.prepare("SELECT id FROM sessions WHERE alive = 1")?;
let mut rows = stmt.query([])?;
let mut ids = Vec::new();
while let Some(row) = rows.next()? {
ids.push(row.get(0)?);
}
Ok(ids)
}
}
fn active_languages_for(conn: &rusqlite::Connection, session_id: &str) -> Vec<String> {
let Ok(mut stmt) = conn.prepare(
"SELECT DISTINCT server FROM messages \
WHERE session_id = ?1 AND type = 'lsp' \
ORDER BY server",
) else {
return vec![];
};
let Ok(mut rows) = stmt.query([session_id]) else {
return vec![];
};
let mut result = Vec::new();
while let Ok(Some(row)) = rows.next() {
if let Ok(server) = row.get::<_, String>(0) {
result.push(server);
}
}
result
}
pub struct MockDataSource {
pub sessions: Vec<SessionRow>,
pub messages: HashMap<String, Vec<SessionMessage>>,
pub tail_messages: HashMap<String, VecDeque<SessionMessage>>,
}
impl DataSource for MockDataSource {
fn list_sessions(&self) -> Result<Vec<SessionRow>> {
let rows = self
.sessions
.iter()
.map(|r| SessionRow {
info: r.info.clone(),
alive: r.alive,
languages: r.languages.clone(),
})
.collect();
Ok(rows)
}
fn monitor_messages(&self, session_id: &str) -> Result<Vec<SessionMessage>> {
self.messages
.get(session_id)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Session not found: {session_id}"))
}
fn create_message_tail(&self, session_id: &str) -> Result<Box<dyn MessageTail>> {
let messages = self
.tail_messages
.get(session_id)
.cloned()
.unwrap_or_default();
Ok(Box::new(MockMessageTail { messages }))
}
fn delete_session(&self, _session_id: &str) -> Result<()> {
Ok(())
}
fn list_alive_session_ids(&self) -> Result<Vec<String>> {
Ok(self
.sessions
.iter()
.filter(|r| r.alive)
.map(|r| r.info.id.clone())
.collect())
}
}
pub struct MockMessageTail {
messages: VecDeque<SessionMessage>,
}
impl MockMessageTail {
#[must_use]
pub const fn new(messages: VecDeque<SessionMessage>) -> Self {
Self { messages }
}
}
impl MessageTail for MockMessageTail {
fn try_next_message(&mut self) -> Result<Option<SessionMessage>> {
Ok(self.messages.pop_front())
}
}
#[cfg(test)]
#[allow(
clippy::expect_used,
reason = "tests use expect for readable assertions"
)]
mod tests {
use super::*;
use chrono::Utc;
use crate::session::SessionInfo;
fn test_db() -> (tempfile::TempDir, std::path::PathBuf, rusqlite::Connection) {
let dir = tempfile::tempdir().expect("failed to create tempdir for test DB");
let path = dir.path().join("catenary").join("catenary.db");
let conn = crate::db::open_and_migrate_at(&path).expect("failed to open test DB");
(dir, path, conn)
}
fn make_session_info(id: &str) -> SessionInfo {
SessionInfo {
id: id.to_string(),
pid: 1234,
workspace: "/tmp/test".to_string(),
started_at: Utc::now(),
client_name: None,
client_version: None,
client_session_id: None,
}
}
fn make_message(method: &str) -> SessionMessage {
SessionMessage {
id: 0,
r#type: "lsp".to_string(),
method: method.to_string(),
server: "rust-analyzer".to_string(),
client: "catenary".to_string(),
request_id: None,
parent_id: None,
timestamp: Utc::now(),
payload: serde_json::Value::Object(serde_json::Map::new()),
}
}
#[test]
fn test_mock_data_source_list_sessions() -> Result<()> {
let ds = MockDataSource {
sessions: vec![
SessionRow {
info: make_session_info("active-1"),
alive: true,
languages: vec!["rust".to_string()],
},
SessionRow {
info: make_session_info("dead-1"),
alive: false,
languages: vec![],
},
],
messages: HashMap::new(),
tail_messages: HashMap::new(),
};
let rows = ds.list_sessions()?;
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].info.id, "active-1");
assert!(rows[0].alive);
assert_eq!(rows[0].languages, vec!["rust".to_string()]);
assert_eq!(rows[1].info.id, "dead-1");
assert!(!rows[1].alive);
Ok(())
}
#[test]
fn test_mock_data_source_monitor_messages() -> Result<()> {
let messages = vec![
make_message("initialize"),
make_message("textDocument/hover"),
make_message("textDocument/definition"),
];
let mut messages_map = HashMap::new();
messages_map.insert("abc".to_string(), messages);
let ds = MockDataSource {
sessions: vec![],
messages: messages_map,
tail_messages: HashMap::new(),
};
let result = ds.monitor_messages("abc")?;
assert_eq!(result.len(), 3);
let err = ds.monitor_messages("nonexistent");
assert!(err.is_err());
Ok(())
}
#[test]
fn test_mock_message_tail_drains() -> Result<()> {
let mut messages = VecDeque::new();
messages.push_back(make_message("initialize"));
messages.push_back(make_message("shutdown"));
let mut tail = MockMessageTail::new(messages);
assert!(tail.try_next_message()?.is_some());
assert!(tail.try_next_message()?.is_some());
assert!(tail.try_next_message()?.is_none());
Ok(())
}
fn create_session(
db_path: &std::path::Path,
workspace: &str,
) -> Result<crate::session::Session> {
let arc = std::sync::Arc::new(std::sync::Mutex::new(crate::db::open_and_migrate_at(
db_path,
)?));
crate::session::Session::create_with_conn(workspace, arc)
}
#[test]
fn test_sqlite_data_source_list_sessions() -> Result<()> {
let (_dir, path, conn) = test_db();
let ds = SqliteDataSource::with_conn(conn);
let session = create_session(&path, "/tmp/test-ds-list")?;
let id = session.info.id.clone();
let rows = ds.list_sessions()?;
assert!(rows.iter().any(|r| r.info.id == id));
drop(session);
ds.delete_session(&id)?;
Ok(())
}
#[test]
fn test_sqlite_data_source_monitor_messages() -> Result<()> {
let (_dir, path, conn) = test_db();
let ds = SqliteDataSource::with_conn(conn);
let session = create_session(&path, "/tmp/test-ds-messages")?;
let id = session.info.id.clone();
session.message_log().log(
"lsp",
"textDocument/hover",
"rust-analyzer",
"catenary",
None,
None,
&serde_json::json!({}),
);
let messages = ds.monitor_messages(&id)?;
assert!(!messages.is_empty(), "should have at least one message");
drop(session);
ds.delete_session(&id)?;
Ok(())
}
#[test]
fn test_sqlite_message_tail_streams() -> Result<()> {
let (_dir, path, conn) = test_db();
let session = create_session(&path, "/tmp/test-ds-tail")?;
let id = session.info.id.clone();
let tail_conn = crate::db::open_at(&path)?;
let mut tail = crate::session::tail_messages_new_with_conn(tail_conn, &id)?;
assert!(
tail.try_next_message()?.is_none(),
"should have no messages initially"
);
session.message_log().log(
"lsp",
"textDocument/hover",
"rust-analyzer",
"catenary",
None,
None,
&serde_json::json!({}),
);
let msg = tail.try_next_message()?;
assert!(msg.is_some(), "should see newly logged message");
assert!(tail.try_next_message()?.is_none());
drop(session);
conn.execute("DELETE FROM sessions WHERE id = ?1", [&id])?;
Ok(())
}
#[test]
fn test_sqlite_data_source_delete_session() -> Result<()> {
let (_dir, path, conn) = test_db();
let ds = SqliteDataSource::with_conn(conn);
let session = create_session(&path, "/tmp/test-ds-delete")?;
let id = session.info.id.clone();
drop(session);
assert!(ds.list_sessions()?.iter().any(|r| r.info.id == id));
ds.delete_session(&id)?;
assert!(!ds.list_sessions()?.iter().any(|r| r.info.id == id));
Ok(())
}
#[test]
fn test_mock_list_alive_session_ids() -> Result<()> {
let ds = MockDataSource {
sessions: vec![
SessionRow {
info: make_session_info("alive-1"),
alive: true,
languages: vec![],
},
SessionRow {
info: make_session_info("dead-1"),
alive: false,
languages: vec![],
},
SessionRow {
info: make_session_info("alive-2"),
alive: true,
languages: vec![],
},
],
messages: HashMap::new(),
tail_messages: HashMap::new(),
};
let ids = ds.list_alive_session_ids()?;
assert_eq!(ids.len(), 2);
assert!(ids.contains(&"alive-1".to_string()));
assert!(ids.contains(&"alive-2".to_string()));
assert!(!ids.contains(&"dead-1".to_string()));
Ok(())
}
#[test]
fn test_sqlite_list_alive_session_ids() -> Result<()> {
let (_dir, path, conn) = test_db();
let ds = SqliteDataSource::with_conn(conn);
let session = create_session(&path, "/tmp/test-ds-alive-ids")?;
let id = session.info.id.clone();
let ids = ds.list_alive_session_ids()?;
assert!(ids.contains(&id), "alive session should appear");
drop(session);
ds.delete_session(&id)?;
Ok(())
}
}