use alloc::string::String;
use alloc::vec;
use alloc::vec::Vec;
use super::diff::{DiffEngine, DiffOptions};
use super::history::{VersionHistory, VersionHistoryProvider};
use super::parser::QueryParser;
use super::resolver::{TimeSpecResolver, TxgHistoryProvider};
use super::restore::{RestoreEngine, RestoreOptions, RestoreResult, RestoreTarget};
use super::types::{
AggregateResult, ChangeType, DiffEntry, FileVersion, OrderBy, QueryResult, QueryRow,
SnapshotInfo, TimeError, TimeQuery, TimeSpec,
};
use super::walker::{HistoricalTreeProvider, HistoricalTreeWalker, WalkOptions};
pub trait TimeTravelProvider: TxgHistoryProvider + VersionHistoryProvider {}
impl<T: TxgHistoryProvider + VersionHistoryProvider> TimeTravelProvider for T {}
pub struct TimeTravelEngine<'a, P: TimeTravelProvider> {
provider: &'a P,
}
impl<'a, P: TimeTravelProvider> TimeTravelEngine<'a, P> {
pub fn new(provider: &'a P) -> Self {
Self { provider }
}
pub fn query(&self, sql: &str) -> Result<QueryResult, TimeError> {
let query = QueryParser::parse(sql)?;
self.execute(&query)
}
pub fn execute(&self, query: &TimeQuery) -> Result<QueryResult, TimeError> {
match query {
TimeQuery::Select {
columns: _,
path,
time,
filter,
limit,
order_by,
} => self.execute_select(path, time, filter.clone(), *limit, order_by.clone()),
TimeQuery::Diff {
path,
from,
to,
change_types,
} => self.execute_diff(path, from, to, change_types.clone()),
TimeQuery::Versions { path, limit } => self.execute_versions(path, *limit),
TimeQuery::Restore {
path,
time,
dest_path,
} => self.execute_restore_query(path, time, dest_path.clone()),
TimeQuery::ShowSnapshots { path } => self.execute_show_snapshots(path),
TimeQuery::Aggregate {
functions,
path,
time,
filter,
} => self.execute_aggregate(functions, path, time, filter.clone()),
}
}
fn execute_select(
&self,
path: &str,
time: &TimeSpec,
filter: Option<super::types::Filter>,
limit: Option<usize>,
order_by: Option<OrderBy>,
) -> Result<QueryResult, TimeError> {
let resolver = TimeSpecResolver::new(self.provider);
let txg = resolver.resolve(time)?;
let walker = HistoricalTreeWalker::new(self.provider, txg);
let walk_options = WalkOptions {
filter,
limit: None, ..Default::default()
};
let entries = if walker.exists(path) {
let entry = walker.lookup(path)?;
if entry.is_dir() {
walker.walk(path, &walk_options)?
} else {
vec![entry]
}
} else {
return Err(TimeError::PathNotFound(path.into()));
};
let mut rows: Vec<QueryRow> = entries.iter().map(|e| e.to_query_row()).collect();
if let Some(ref order) = order_by {
rows.sort_by(|a, b| {
let cmp = match order.column.as_str() {
"path" => a.path.cmp(&b.path),
"size" => a.size.cmp(&b.size),
"mtime" => a.mtime.cmp(&b.mtime),
"ctime" => a.ctime.cmp(&b.ctime),
"atime" => a.atime.cmp(&b.atime),
"txg" => a.txg.cmp(&b.txg),
_ => core::cmp::Ordering::Equal,
};
if order.ascending { cmp } else { cmp.reverse() }
});
}
if let Some(limit) = limit {
rows.truncate(limit);
}
Ok(QueryResult::Rows(rows))
}
fn execute_diff(
&self,
path: &str,
from: &TimeSpec,
to: &TimeSpec,
change_types: Option<Vec<ChangeType>>,
) -> Result<QueryResult, TimeError> {
let resolver = TimeSpecResolver::new(self.provider);
let from_txg = resolver.resolve(from)?;
let to_txg = resolver.resolve(to)?;
let engine = DiffEngine::new(self.provider);
let options = DiffOptions {
change_types,
..Default::default()
};
let diffs = engine.diff(path, from_txg, to_txg, &options)?;
Ok(QueryResult::Diffs(diffs))
}
fn execute_versions(&self, path: &str, limit: Option<usize>) -> Result<QueryResult, TimeError> {
let history = VersionHistory::new(self.provider);
let versions = history.get_versions(path, limit)?;
Ok(QueryResult::Versions(versions))
}
fn execute_restore_query(
&self,
path: &str,
time: &TimeSpec,
dest_path: Option<String>,
) -> Result<QueryResult, TimeError> {
let resolver = TimeSpecResolver::new(self.provider);
let txg = resolver.resolve(time)?;
let walker = HistoricalTreeWalker::new(self.provider, txg);
if !walker.exists(path) {
return Err(TimeError::PathNotFound(path.into()));
}
let dest = dest_path.unwrap_or_else(|| path.into());
Ok(QueryResult::Restored { path: dest, txg })
}
fn execute_show_snapshots(&self, _path: &str) -> Result<QueryResult, TimeError> {
let snapshots = self.provider.list_snapshots();
Ok(QueryResult::Snapshots(snapshots))
}
fn execute_aggregate(
&self,
functions: &[super::types::AggregateFunc],
path: &str,
time: &TimeSpec,
filter: Option<super::types::Filter>,
) -> Result<QueryResult, TimeError> {
let resolver = TimeSpecResolver::new(self.provider);
let txg = resolver.resolve(time)?;
let walker = HistoricalTreeWalker::new(self.provider, txg);
let walk_options = WalkOptions {
filter,
..Default::default()
};
let entries = if walker.exists(path) {
let entry = walker.lookup(path)?;
if entry.is_dir() {
walker.walk(path, &walk_options)?
} else {
vec![entry]
}
} else {
return Err(TimeError::PathNotFound(path.into()));
};
let mut result = AggregateResult {
count: None,
sum: None,
avg: None,
min: None,
max: None,
};
for func in functions {
match func {
super::types::AggregateFunc::Count(_) => {
result.count = Some(entries.len() as u64);
}
super::types::AggregateFunc::Sum(col) => {
let sum: u64 = entries.iter().map(|e| self.get_numeric_value(e, col)).sum();
result.sum = Some(sum);
}
super::types::AggregateFunc::Avg(col) => {
if entries.is_empty() {
result.avg = None;
} else {
let sum: u64 = entries.iter().map(|e| self.get_numeric_value(e, col)).sum();
result.avg = Some(sum as f64 / entries.len() as f64);
}
}
super::types::AggregateFunc::Min(col) => {
result.min = entries.iter().map(|e| self.get_numeric_value(e, col)).min();
}
super::types::AggregateFunc::Max(col) => {
result.max = entries.iter().map(|e| self.get_numeric_value(e, col)).max();
}
}
}
Ok(QueryResult::Aggregate(result))
}
fn get_numeric_value(&self, entry: &super::walker::HistoricalEntry, column: &str) -> u64 {
match column {
"size" => entry.size,
"mtime" => entry.mtime,
"ctime" => entry.ctime,
"atime" => entry.atime,
"txg" => entry.txg,
"mode" => entry.mode as u64,
"uid" => entry.uid as u64,
"gid" => entry.gid as u64,
"nlinks" => entry.nlinks,
"blocks" => entry.blocks,
_ => 0,
}
}
pub fn ls(&self, path: &str, time: &TimeSpec) -> Result<Vec<QueryRow>, TimeError> {
let result = self.execute(&TimeQuery::Select {
columns: vec![super::types::Column::All],
path: path.into(),
time: time.clone(),
filter: None,
limit: None,
order_by: Some(OrderBy {
column: "path".into(),
ascending: true,
}),
})?;
match result {
QueryResult::Rows(rows) => Ok(rows),
_ => Err(TimeError::NotSupported("unexpected result type".into())),
}
}
pub fn diff(
&self,
path: &str,
from: &TimeSpec,
to: &TimeSpec,
) -> Result<Vec<DiffEntry>, TimeError> {
let result = self.execute(&TimeQuery::Diff {
path: path.into(),
from: from.clone(),
to: to.clone(),
change_types: None,
})?;
match result {
QueryResult::Diffs(diffs) => Ok(diffs),
_ => Err(TimeError::NotSupported("unexpected result type".into())),
}
}
pub fn versions(&self, path: &str) -> Result<Vec<FileVersion>, TimeError> {
let result = self.execute(&TimeQuery::Versions {
path: path.into(),
limit: None,
})?;
match result {
QueryResult::Versions(versions) => Ok(versions),
_ => Err(TimeError::NotSupported("unexpected result type".into())),
}
}
pub fn snapshots(&self) -> Result<Vec<SnapshotInfo>, TimeError> {
let result = self.execute(&TimeQuery::ShowSnapshots { path: "/".into() })?;
match result {
QueryResult::Snapshots(snaps) => Ok(snaps),
_ => Err(TimeError::NotSupported("unexpected result type".into())),
}
}
pub fn resolve_time(&self, time: &TimeSpec) -> Result<u64, TimeError> {
let resolver = TimeSpecResolver::new(self.provider);
resolver.resolve(time)
}
pub fn existed_at(&self, path: &str, time: &TimeSpec) -> Result<bool, TimeError> {
let txg = self.resolve_time(time)?;
let walker = HistoricalTreeWalker::new(self.provider, txg);
Ok(walker.exists(path))
}
pub fn stat(&self, path: &str, time: &TimeSpec) -> Result<QueryRow, TimeError> {
let txg = self.resolve_time(time)?;
let walker = HistoricalTreeWalker::new(self.provider, txg);
let entry = walker.lookup(path)?;
Ok(entry.to_query_row())
}
pub fn restore<T: RestoreTarget>(
&self,
target: &mut T,
path: &str,
time: &TimeSpec,
dest_path: Option<&str>,
options: &RestoreOptions,
) -> Result<RestoreResult, TimeError> {
let txg = self.resolve_time(time)?;
let mut engine = RestoreEngine::new(self.provider, target);
engine.restore(path, dest_path, txg, options)
}
}
#[cfg(test)]
mod tests {
use super::super::history::InMemoryVersionProvider;
use super::super::resolver::TxgTimestamp;
use super::super::types::FileType;
use super::super::walker::HistoricalEntry;
use super::*;
fn create_entry(
path: &str,
name: &str,
file_type: FileType,
size: u64,
txg: u64,
) -> HistoricalEntry {
HistoricalEntry {
name: name.into(),
path: path.into(),
object_id: path.len() as u64,
parent_id: 1,
file_type,
size,
mode: 0o644,
uid: 1000,
gid: 1000,
atime: txg * 1000,
mtime: txg * 1000,
ctime: txg * 1000,
txg,
checksum: [txg; 4],
nlinks: 1,
blocks: size.div_ceil(512),
generation: txg,
}
}
struct TestProvider {
version_provider: InMemoryVersionProvider,
txg_history: Vec<TxgTimestamp>,
}
impl TestProvider {
fn new() -> Self {
Self {
version_provider: InMemoryVersionProvider::new(),
txg_history: Vec::new(),
}
}
fn add_entry(&mut self, txg: u64, entry: HistoricalEntry) {
self.version_provider.add_entry(txg, entry);
}
fn add_txg(&mut self, txg: u64, timestamp: u64) {
self.txg_history.push(TxgTimestamp { txg, timestamp });
self.version_provider.add_txg_timestamp(txg, timestamp);
}
fn add_snapshot(&mut self, info: SnapshotInfo) {
self.version_provider.add_snapshot(info);
}
}
impl TxgHistoryProvider for TestProvider {
fn current_txg(&self) -> u64 {
self.txg_history.iter().map(|t| t.txg).max().unwrap_or(0)
}
fn current_timestamp(&self) -> u64 {
self.txg_history
.iter()
.map(|t| t.timestamp)
.max()
.unwrap_or(0)
}
fn min_txg(&self) -> u64 {
self.txg_history.iter().map(|t| t.txg).min().unwrap_or(0)
}
fn txg_to_timestamp(&self, txg: u64) -> Option<u64> {
self.txg_history
.iter()
.find(|t| t.txg == txg)
.map(|t| t.timestamp)
}
fn timestamp_to_txg(&self, timestamp: u64) -> Option<u64> {
self.txg_history
.iter()
.filter(|t| t.timestamp <= timestamp)
.max_by_key(|t| t.timestamp)
.map(|t| t.txg)
}
fn txg_history(&self) -> Vec<TxgTimestamp> {
self.txg_history.clone()
}
fn lookup_snapshot(&self, name: &str) -> Option<SnapshotInfo> {
self.version_provider
.snapshots
.iter()
.find(|s| s.name == name)
.cloned()
}
fn list_snapshots(&self) -> Vec<SnapshotInfo> {
self.version_provider.snapshots.clone()
}
}
impl HistoricalTreeProvider for TestProvider {
fn root_at_txg(&self, txg: u64) -> Result<HistoricalEntry, TimeError> {
self.version_provider.root_at_txg(txg)
}
fn lookup_at_txg(&self, path: &str, txg: u64) -> Result<HistoricalEntry, TimeError> {
self.version_provider.lookup_at_txg(path, txg)
}
fn readdir_at_txg(&self, path: &str, txg: u64) -> Result<Vec<HistoricalEntry>, TimeError> {
self.version_provider.readdir_at_txg(path, txg)
}
fn lookup_by_id_at_txg(
&self,
object_id: u64,
txg: u64,
) -> Result<HistoricalEntry, TimeError> {
self.version_provider.lookup_by_id_at_txg(object_id, txg)
}
fn exists_at_txg(&self, path: &str, txg: u64) -> bool {
self.version_provider.exists_at_txg(path, txg)
}
fn readlink_at_txg(&self, path: &str, txg: u64) -> Result<String, TimeError> {
self.version_provider.readlink_at_txg(path, txg)
}
}
impl VersionHistoryProvider for TestProvider {
fn file_txg_history(&self, path: &str) -> Result<Vec<u64>, TimeError> {
self.version_provider.file_txg_history(path)
}
fn all_snapshots(&self) -> Vec<SnapshotInfo> {
self.version_provider.all_snapshots()
}
fn txg_timestamp(&self, txg: u64) -> Option<u64> {
self.version_provider.txg_timestamp(txg)
}
}
fn create_test_provider() -> TestProvider {
let mut provider = TestProvider::new();
provider.add_txg(100, 1704067200); provider.add_txg(200, 1705276800); provider.add_txg(300, 1706486400);
provider.add_entry(100, create_entry("/", "", FileType::Directory, 0, 100));
provider.add_entry(
100,
create_entry("/data", "data", FileType::Directory, 0, 100),
);
provider.add_entry(
100,
create_entry("/data/file1.txt", "file1.txt", FileType::Regular, 100, 100),
);
provider.add_entry(
100,
create_entry("/data/file2.txt", "file2.txt", FileType::Regular, 200, 100),
);
provider.add_entry(200, create_entry("/", "", FileType::Directory, 0, 200));
provider.add_entry(
200,
create_entry("/data", "data", FileType::Directory, 0, 200),
);
provider.add_entry(
200,
create_entry("/data/file1.txt", "file1.txt", FileType::Regular, 150, 200),
);
provider.add_snapshot(SnapshotInfo {
name: "backup-1".into(),
creation_time: 1705276800,
txg: 200,
referenced: 1024 * 1024,
used: 512 * 1024,
});
provider
}
#[test]
fn test_query_select() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine
.query("SELECT * FROM /data AS OF '2024-01-01'")
.unwrap();
match result {
QueryResult::Rows(rows) => {
assert_eq!(rows.len(), 2);
}
_ => panic!("expected rows"),
}
}
#[test]
fn test_query_diff() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine
.query("DIFF /data BETWEEN '2024-01-01' AND '2024-01-15'")
.unwrap();
match result {
QueryResult::Diffs(diffs) => {
assert!(!diffs.is_empty());
}
_ => panic!("expected diffs"),
}
}
#[test]
fn test_query_versions() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine.query("VERSIONS /data/file1.txt").unwrap();
match result {
QueryResult::Versions(versions) => {
assert_eq!(versions.len(), 2); }
_ => panic!("expected versions"),
}
}
#[test]
fn test_query_show_snapshots() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine.query("SHOW SNAPSHOTS").unwrap();
match result {
QueryResult::Snapshots(snaps) => {
assert_eq!(snaps.len(), 1);
assert_eq!(snaps[0].name, "backup-1");
}
_ => panic!("expected snapshots"),
}
}
#[test]
fn test_query_aggregate() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine
.query("SELECT COUNT(*), SUM(size) FROM /data AS OF '2024-01-01'")
.unwrap();
match result {
QueryResult::Aggregate(agg) => {
assert_eq!(agg.count, Some(2));
assert_eq!(agg.sum, Some(300)); }
_ => panic!("expected aggregate"),
}
}
#[test]
fn test_ls() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let rows = engine.ls("/data", &TimeSpec::Txg(100)).unwrap();
assert_eq!(rows.len(), 2);
}
#[test]
fn test_existed_at() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
assert!(
engine
.existed_at("/data/file1.txt", &TimeSpec::Txg(100))
.unwrap()
);
assert!(
!engine
.existed_at("/nonexistent", &TimeSpec::Txg(100))
.unwrap()
);
}
#[test]
fn test_stat() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let row = engine.stat("/data/file1.txt", &TimeSpec::Txg(100)).unwrap();
assert_eq!(row.size, 100);
let row = engine.stat("/data/file1.txt", &TimeSpec::Txg(200)).unwrap();
assert_eq!(row.size, 150);
}
#[test]
fn test_resolve_time() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
assert_eq!(engine.resolve_time(&TimeSpec::Txg(100)).unwrap(), 100);
assert_eq!(
engine
.resolve_time(&TimeSpec::Snapshot("backup-1".into()))
.unwrap(),
200
);
assert_eq!(engine.resolve_time(&TimeSpec::Now).unwrap(), 300);
}
#[test]
fn test_query_with_order_by() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine
.query("SELECT * FROM /data AS OF TXG 100 ORDER BY size DESC")
.unwrap();
match result {
QueryResult::Rows(rows) => {
assert_eq!(rows.len(), 2);
assert!(rows[0].size >= rows[1].size);
}
_ => panic!("expected rows"),
}
}
#[test]
fn test_query_with_limit() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine
.query("SELECT * FROM /data AS OF TXG 100 LIMIT 1")
.unwrap();
match result {
QueryResult::Rows(rows) => {
assert_eq!(rows.len(), 1);
}
_ => panic!("expected rows"),
}
}
#[test]
fn test_invalid_query() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine.query("INVALID QUERY");
assert!(result.is_err());
}
#[test]
fn test_path_not_found() {
let provider = create_test_provider();
let engine = TimeTravelEngine::new(&provider);
let result = engine.query("SELECT * FROM /nonexistent AS OF TXG 100");
assert!(matches!(result, Err(TimeError::PathNotFound(_))));
}
}