use crate::adapters::{Adapter, AdapterError, RawRecord};
use crate::storage::LedgerRow;
use rusqlite::{Connection, OpenFlags, OptionalExtension};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::time::Duration;
const MAX_BYTES_PER_VALUE: usize = 64 * 1024 * 1024;
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct CursorCursor {
pub db_path: PathBuf,
pub last_rowid: i64,
pub last_msg_id: String,
}
pub struct CursorAdapter {
pub db_root: Option<PathBuf>,
}
impl CursorAdapter {
pub fn new() -> Self {
Self { db_root: None }
}
pub fn with_db_root(root: PathBuf) -> Self {
Self {
db_root: Some(root),
}
}
fn resolve_db_root(&self) -> Result<PathBuf, AdapterError> {
if let Some(p) = &self.db_root {
return Ok(p.clone());
}
let home =
dirs::home_dir().ok_or_else(|| AdapterError::PathNotFound(PathBuf::from("$HOME")))?;
#[cfg(target_os = "linux")]
{
Ok(home
.join(".config")
.join("Cursor")
.join("User")
.join("globalStorage"))
}
#[cfg(target_os = "macos")]
{
Ok(home
.join("Library")
.join("Application Support")
.join("Cursor")
.join("User")
.join("globalStorage"))
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
Err(AdapterError::PathNotFound(PathBuf::from(
"Cursor globalStorage (unsupported platform for v0.1)",
)))
}
}
fn db_path(&self) -> Result<PathBuf, AdapterError> {
Ok(self.resolve_db_root()?.join("state.vscdb"))
}
}
impl Default for CursorAdapter {
fn default() -> Self {
Self::new()
}
}
impl Adapter for CursorAdapter {
type Cursor = CursorCursor;
fn name(&self) -> &'static str {
"cursor"
}
fn detect(&self) -> Result<Option<PathBuf>, AdapterError> {
match self.db_path() {
Ok(p) => {
if p.exists() {
Ok(Some(p))
} else {
Ok(None)
}
}
Err(_) => Ok(None),
}
}
fn read_new_records(
&self,
since: &CursorCursor,
) -> Result<(Vec<RawRecord>, CursorCursor), AdapterError> {
if !since.db_path.as_os_str().is_empty() {
let db_root = self.resolve_db_root()?;
let canonical_root = db_root
.canonicalize()
.map_err(|_| AdapterError::PathNotFound(db_root.clone()))?;
let canonical_db = since
.db_path
.canonicalize()
.map_err(|_| AdapterError::PathNotFound(since.db_path.clone()))?;
let db_parent = canonical_db
.parent()
.ok_or_else(|| AdapterError::PathNotFound(since.db_path.clone()))?;
if !db_parent.starts_with(&canonical_root) {
return Err(AdapterError::PathNotFound(since.db_path.clone()));
}
}
let live_db = if since.db_path.as_os_str().is_empty() {
self.db_path()?
} else {
since.db_path.clone()
};
let (conn, _temp_guard) = open_with_fallback(&live_db)?;
let mut raw_msgs: Vec<ParsedMsg> = Vec::new();
raw_msgs.extend(read_generations(&conn)?);
raw_msgs.extend(read_prompts(&conn)?);
raw_msgs.extend(read_composer_data(&conn)?);
let filtered: Vec<ParsedMsg> = raw_msgs
.into_iter()
.filter(|m| m.msg_id > since.last_msg_id)
.collect();
let records: Vec<RawRecord> = filtered
.iter()
.enumerate()
.map(|(i, m)| {
let payload = serde_json::to_vec(&m.value).unwrap_or_default();
let offset = (since.last_rowid + 1 + i as i64) as u64;
RawRecord {
tool: "cursor".to_string(),
payload,
offset,
}
})
.collect();
let new_last_msg_id = filtered
.iter()
.map(|m| m.msg_id.as_str())
.max()
.map(|s| s.to_string())
.unwrap_or_else(|| since.last_msg_id.clone());
let new_rowid = since.last_rowid + filtered.len() as i64;
let advanced = CursorCursor {
db_path: live_db,
last_rowid: new_rowid,
last_msg_id: new_last_msg_id,
};
Ok((records, advanced))
}
fn parse(&self, records: Vec<RawRecord>) -> Result<Vec<LedgerRow>, AdapterError> {
let mut rows = Vec::with_capacity(records.len());
for rec in records {
let v: serde_json::Value =
serde_json::from_slice(&rec.payload).map_err(|e| AdapterError::Parse {
offset: rec.offset,
context: "cursor payload is not valid JSON",
source: e,
})?;
let session_id = v
.get("sessionId")
.and_then(|s| s.as_str())
.ok_or_else(|| AdapterError::Parse {
offset: rec.offset,
context: "missing sessionId field",
source: make_missing_field_error(),
})?
.to_string();
let ts = extract_ts(&v, rec.offset)?;
let role = v
.get("role")
.and_then(|r| r.as_str())
.ok_or_else(|| AdapterError::Parse {
offset: rec.offset,
context: "missing role field",
source: make_missing_field_error(),
})?
.to_string();
let content = match v.get("content") {
None => String::new(),
Some(serde_json::Value::String(s)) => s.clone(),
Some(other) => serde_json::to_string(other).map_err(|e| AdapterError::Parse {
offset: rec.offset,
context: "failed to serialize content field",
source: e,
})?,
};
rows.push(LedgerRow {
session_id,
tool: "cursor".to_string(),
ts,
role,
content,
tool_calls_json: None,
files_touched_json: None,
parent_id: None,
});
}
Ok(rows)
}
}
#[derive(Debug)]
struct ParsedMsg {
msg_id: String,
value: serde_json::Value,
}
fn attempt_open(db_path: &Path) -> Result<Connection, rusqlite::Error> {
let conn = Connection::open_with_flags(
db_path,
OpenFlags::SQLITE_OPEN_READ_ONLY
| OpenFlags::SQLITE_OPEN_NO_MUTEX
| OpenFlags::SQLITE_OPEN_URI,
)?;
conn.busy_timeout(Duration::from_millis(200))?;
Ok(conn)
}
fn copy_db_to_temp(db_path: &Path) -> Result<(tempfile::TempDir, PathBuf), AdapterError> {
let tmp = build_owner_only_tempdir()?;
let dst = tmp.path().join("state.vscdb");
std::fs::copy(db_path, &dst).map_err(AdapterError::WalCopyFailed)?;
chmod_owner_only(&dst)?;
let wal = db_path.with_extension("vscdb-wal");
if wal.exists() {
let dst_wal = tmp.path().join("state.vscdb-wal");
std::fs::copy(&wal, &dst_wal).map_err(AdapterError::WalCopyFailed)?;
chmod_owner_only(&dst_wal)?;
}
let shm = db_path.with_extension("vscdb-shm");
if shm.exists() {
let dst_shm = tmp.path().join("state.vscdb-shm");
std::fs::copy(&shm, &dst_shm).map_err(AdapterError::WalCopyFailed)?;
chmod_owner_only(&dst_shm)?;
}
Ok((tmp, dst))
}
fn build_owner_only_tempdir() -> Result<tempfile::TempDir, AdapterError> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
tempfile::Builder::new()
.permissions(std::fs::Permissions::from_mode(0o700))
.tempdir()
.map_err(AdapterError::WalCopyFailed)
}
#[cfg(not(unix))]
{
tempfile::tempdir().map_err(AdapterError::WalCopyFailed)
}
}
fn chmod_owner_only(path: &Path) -> Result<(), AdapterError> {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))
.map_err(AdapterError::WalCopyFailed)?;
}
#[cfg(not(unix))]
{
let _ = path;
}
Ok(())
}
fn is_busy(err: &rusqlite::Error) -> bool {
use rusqlite::ffi::ErrorCode;
matches!(
err,
rusqlite::Error::SqliteFailure(e, _)
if e.code == ErrorCode::DatabaseBusy || e.code == ErrorCode::SystemIoFailure
)
}
fn open_with_fallback(
db_path: &Path,
) -> Result<(Connection, Option<tempfile::TempDir>), AdapterError> {
let delays = [50u64, 200, 500];
for delay_ms in delays {
match attempt_open(db_path) {
Ok(conn) => return Ok((conn, None)),
Err(e) if is_busy(&e) => {
std::thread::sleep(Duration::from_millis(delay_ms));
}
Err(e) => return Err(AdapterError::Sqlite(e)),
}
}
let (tmp, copied_path) = copy_db_to_temp(db_path)?;
let conn = attempt_open(&copied_path).map_err(AdapterError::Sqlite)?;
Ok((conn, Some(tmp)))
}
fn read_item_value(conn: &Connection, key: &str) -> Result<Option<String>, AdapterError> {
let mut stmt = conn
.prepare("SELECT value FROM ItemTable WHERE key = ?1")
.map_err(AdapterError::Sqlite)?;
let result: Option<String> = stmt
.query_row([key], |row| {
use rusqlite::types::ValueRef;
let bytes: Vec<u8> = match row.get_ref(0)? {
ValueRef::Text(t) => t.to_vec(),
ValueRef::Blob(b) => b.to_vec(),
ValueRef::Null => return Ok(None),
_ => {
return Err(rusqlite::Error::InvalidColumnType(
0,
"value".to_string(),
rusqlite::types::Type::Blob,
))
}
};
Ok(Some(bytes))
})
.optional()
.map_err(AdapterError::Sqlite)?
.flatten()
.map(|bytes| {
if bytes.len() > MAX_BYTES_PER_VALUE {
Err(bytes)
} else {
Ok(bytes)
}
})
.transpose()
.map_err(|oversized| {
let _ = oversized;
let dummy_err = serde_json::from_str::<serde_json::Value>("").unwrap_err();
AdapterError::Parse {
offset: 0,
context: "value exceeds size cap",
source: dummy_err,
}
})?
.map(|bytes| {
String::from_utf8(bytes).map_err(|e| AdapterError::Parse {
offset: 0,
context: "ItemTable value is not valid UTF-8",
source: serde_json::from_str::<serde_json::Value>(&e.to_string()).unwrap_err(),
})
})
.transpose()?;
Ok(result)
}
fn read_generations(conn: &Connection) -> Result<Vec<ParsedMsg>, AdapterError> {
let text = match read_item_value(conn, "aiService.generations")? {
None => return Ok(vec![]),
Some(t) => t,
};
let arr: Vec<serde_json::Value> =
serde_json::from_str(&text).map_err(|e| AdapterError::Parse {
offset: 0,
context: "aiService.generations is not a JSON array",
source: e,
})?;
let mut out = Vec::with_capacity(arr.len() * 2);
for gen in arr {
let gen_id = gen
.get("generationId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let session_id = gen
.get("sessionId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let request_ts = gen.get("requestTs").and_then(|v| v.as_i64()).unwrap_or(0);
let response_ts = gen.get("responseTs").and_then(|v| v.as_i64()).unwrap_or(0);
let user_msg = gen
.get("userMessage")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let asst_msg = gen
.get("assistantMessage")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
out.push(ParsedMsg {
msg_id: format!("{gen_id}:user"),
value: serde_json::json!({
"sessionId": session_id,
"role": "user",
"content": user_msg,
"ts": request_ts,
}),
});
out.push(ParsedMsg {
msg_id: format!("{gen_id}:assistant"),
value: serde_json::json!({
"sessionId": session_id,
"role": "assistant",
"content": asst_msg,
"ts": response_ts,
}),
});
}
Ok(out)
}
fn read_prompts(conn: &Connection) -> Result<Vec<ParsedMsg>, AdapterError> {
let text = match read_item_value(conn, "aiService.prompts")? {
None => return Ok(vec![]),
Some(t) => t,
};
let arr: Vec<serde_json::Value> =
serde_json::from_str(&text).map_err(|e| AdapterError::Parse {
offset: 0,
context: "aiService.prompts is not a JSON array",
source: e,
})?;
let mut out = Vec::with_capacity(arr.len());
for prompt in arr {
let prompt_id = prompt
.get("promptId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let session_id = prompt
.get("sessionId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let ts = prompt.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
let text_content = prompt
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
out.push(ParsedMsg {
msg_id: format!("{prompt_id}:user"),
value: serde_json::json!({
"sessionId": session_id,
"role": "user",
"content": text_content,
"ts": ts,
}),
});
}
Ok(out)
}
fn read_composer_data(conn: &Connection) -> Result<Vec<ParsedMsg>, AdapterError> {
let text = match read_item_value(conn, "composer.composerData")? {
None => return Ok(vec![]),
Some(t) => t,
};
let obj: serde_json::Value = serde_json::from_str(&text).map_err(|e| AdapterError::Parse {
offset: 0,
context: "composer.composerData is not valid JSON",
source: e,
})?;
let composers = match obj.get("composers").and_then(|v| v.as_array()) {
None => return Ok(vec![]),
Some(a) => a,
};
let mut out = Vec::new();
for composer in composers {
let composer_id = composer
.get("composerId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let session_id = composer
.get("sessionId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let messages = match composer.get("messages").and_then(|v| v.as_array()) {
None => continue,
Some(m) => m,
};
for (idx, msg) in messages.iter().enumerate() {
let role = msg
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("user")
.to_string();
let content = match msg.get("content") {
None => String::new(),
Some(serde_json::Value::String(s)) => s.clone(),
Some(other) => other.to_string(),
};
let ts = msg.get("ts").and_then(|v| v.as_i64()).unwrap_or(0);
out.push(ParsedMsg {
msg_id: format!("{composer_id}:msg:{idx}"),
value: serde_json::json!({
"sessionId": session_id,
"role": role,
"content": content,
"ts": ts,
}),
});
}
}
Ok(out)
}
fn extract_ts(v: &serde_json::Value, offset: u64) -> Result<i64, AdapterError> {
match v.get("ts") {
Some(serde_json::Value::Number(n)) => n.as_i64().ok_or_else(|| AdapterError::Parse {
offset,
context: "ts field is not a valid i64",
source: make_missing_field_error(),
}),
_ => Err(AdapterError::Parse {
offset,
context: "missing or non-numeric ts field",
source: make_missing_field_error(),
}),
}
}
fn make_missing_field_error() -> serde_json::Error {
serde_json::from_str::<serde_json::Value>("").unwrap_err()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapters::Adapter;
const FIXTURE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/fixtures/cursor");
fn fixture_db_path() -> PathBuf {
PathBuf::from(FIXTURE_DIR).join("1-state.vscdb")
}
fn fixture_adapter() -> CursorAdapter {
CursorAdapter::with_db_root(PathBuf::from(FIXTURE_DIR))
}
fn empty_cursor(db_path: PathBuf) -> CursorCursor {
CursorCursor {
db_path,
last_rowid: 0,
last_msg_id: String::new(),
}
}
#[test]
fn parses_aiservice_generations() {
let adapter = fixture_adapter();
let since = empty_cursor(fixture_db_path());
let (records, _cursor) = adapter.read_new_records(&since).unwrap();
let rows = adapter.parse(records).unwrap();
let assistant_rows: Vec<_> = rows.iter().filter(|r| r.role == "assistant").collect();
assert!(
!assistant_rows.is_empty(),
"expected at least one assistant row from generations"
);
for row in &assistant_rows {
assert!(!row.session_id.is_empty(), "session_id must not be empty");
assert_eq!(row.tool, "cursor");
}
assert!(
rows.iter().any(|r| r.session_id == "cursor-session-001"),
"expected cursor-session-001 in results"
);
}
#[test]
fn parses_aiservice_prompts() {
let adapter = fixture_adapter();
let since = empty_cursor(fixture_db_path());
let (records, _cursor) = adapter.read_new_records(&since).unwrap();
let rows = adapter.parse(records).unwrap();
let user_rows: Vec<_> = rows.iter().filter(|r| r.role == "user").collect();
assert!(
!user_rows.is_empty(),
"expected at least one user row from prompts"
);
let css_row = user_rows
.iter()
.any(|r| r.content.contains("CSS") || r.content.contains("center a div"));
assert!(css_row, "expected CSS prompt content in user rows");
}
#[test]
fn merges_three_keys() {
let adapter = fixture_adapter();
let since = empty_cursor(fixture_db_path());
let (records, _cursor) = adapter.read_new_records(&since).unwrap();
let rows = adapter.parse(records).unwrap();
assert!(
rows.len() >= 6,
"expected at least 6 rows (3 keys merged), got {}",
rows.len()
);
let sessions: std::collections::HashSet<_> =
rows.iter().map(|r| r.session_id.as_str()).collect();
assert!(
sessions.len() >= 2,
"expected rows from at least 2 sessions"
);
}
#[test]
fn cursor_advances_monotonically() {
let adapter = fixture_adapter();
let since0 = empty_cursor(fixture_db_path());
let (records1, cursor1) = adapter.read_new_records(&since0).unwrap();
assert!(!records1.is_empty(), "first read must return records");
assert!(
cursor1.last_rowid >= since0.last_rowid,
"cursor must not regress"
);
let (records2, cursor2) = adapter.read_new_records(&cursor1).unwrap();
assert!(
records2.is_empty(),
"second read with advanced cursor must return nothing"
);
assert_eq!(
cursor2.last_rowid, cursor1.last_rowid,
"cursor must be stable when no new records"
);
assert_eq!(cursor2.last_msg_id, cursor1.last_msg_id);
}
#[test]
#[ignore = "SQLITE_BUSY cannot be reliably triggered for read-only opens in WAL mode; see is_busy_detects_database_busy_code below"]
fn wal_lock_falls_back_to_copy() {
let dir = tempfile::tempdir().unwrap();
let src_db = dir.path().join("state.vscdb");
std::fs::copy(fixture_db_path(), &src_db).unwrap();
let src_db_clone = src_db.clone();
let handle = std::thread::spawn(move || {
let conn =
Connection::open_with_flags(&src_db_clone, OpenFlags::SQLITE_OPEN_READ_WRITE)
.unwrap();
conn.execute_batch("BEGIN EXCLUSIVE;").unwrap();
std::thread::sleep(Duration::from_secs(2));
conn.execute_batch("ROLLBACK;").unwrap();
});
std::thread::sleep(Duration::from_millis(50));
let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf());
let since = empty_cursor(src_db.clone());
let result = adapter.read_new_records(&since);
assert!(
result.is_ok(),
"WAL fallback must succeed; got: {:?}",
result.err()
);
handle.join().unwrap();
}
#[test]
fn is_busy_detects_database_busy_code() {
use rusqlite::ffi::{Error as SqliteErr, ErrorCode};
let busy_err = rusqlite::Error::SqliteFailure(
SqliteErr {
code: ErrorCode::DatabaseBusy,
extended_code: 5,
},
None,
);
assert!(is_busy(&busy_err), "DatabaseBusy must be detected as busy");
let io_err = rusqlite::Error::SqliteFailure(
SqliteErr {
code: ErrorCode::SystemIoFailure,
extended_code: 10,
},
None,
);
assert!(is_busy(&io_err), "SystemIoFailure must be detected as busy");
let other_err = rusqlite::Error::SqliteFailure(
SqliteErr {
code: ErrorCode::NotADatabase,
extended_code: 26,
},
None,
);
assert!(!is_busy(&other_err), "NotADatabase must NOT be busy");
}
#[test]
fn containment_check_rejects_paths_outside_root() {
let dir = tempfile::tempdir().unwrap();
let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf());
let outside_path = PathBuf::from("/tmp");
let since = CursorCursor {
db_path: outside_path,
last_rowid: 0,
last_msg_id: String::new(),
};
let err = adapter.read_new_records(&since).unwrap_err();
assert!(
matches!(err, AdapterError::PathNotFound(_)),
"expected PathNotFound for path outside root, got: {:?}",
err
);
}
#[test]
fn read_cap_rejects_oversized_value() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("state.vscdb");
{
let conn = Connection::open(&db_path).unwrap();
conn.execute_batch("CREATE TABLE ItemTable (key TEXT PRIMARY KEY, value BLOB);")
.unwrap();
let oversized: Vec<u8> = vec![b'x'; 65 * 1024 * 1024];
conn.execute(
"INSERT INTO ItemTable (key, value) VALUES (?1, ?2)",
rusqlite::params!["aiService.generations", oversized],
)
.unwrap();
}
let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf());
let since = empty_cursor(db_path);
let err = adapter.read_new_records(&since).unwrap_err();
match err {
AdapterError::Parse { context, .. } => {
assert_eq!(
context, "value exceeds size cap",
"expected size-cap context"
);
}
other => panic!("expected AdapterError::Parse, got {:?}", other),
}
}
#[test]
fn detect_returns_db_path_when_present() {
let dir = tempfile::tempdir().unwrap();
std::fs::write(dir.path().join("state.vscdb"), b"").unwrap();
let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf());
let result = adapter.detect().unwrap();
assert!(result.is_some(), "detect must return Some when file exists");
assert_eq!(result.unwrap(), dir.path().join("state.vscdb"));
}
#[test]
fn detect_returns_none_when_absent() {
let dir = tempfile::tempdir().unwrap();
let adapter = CursorAdapter::with_db_root(dir.path().to_path_buf());
let result = adapter.detect().unwrap();
assert!(
result.is_none(),
"detect must return None when file is absent"
);
}
}