use crate::document::{self, Document};
use crate::error::{GroundDbError, Result};
use crate::path_template::{self, PathSegment, PathTemplate};
use crate::schema::{
hash_schema, parse_schema, AutoIdStrategy, CollectionDefinition, FieldType, OnConflict,
OnDeletePolicy, SchemaDefinition,
};
use crate::system_db::{compute_directory_hash, SystemDb};
use crate::util::json_to_yaml as json_value_to_yaml;
use crate::validation;
use crate::migration;
use crate::view::{self as view_engine, ViewEngine};
use crate::watcher::{ChangeKind, FileWatcher, WatcherEvent};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, atomic::{AtomicU64, Ordering}};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(u64);
#[derive(Debug, Clone)]
pub enum ChangeEvent {
Inserted { id: String, data: serde_json::Value },
Updated { id: String, data: serde_json::Value },
Deleted { id: String },
}
type ViewCallback = Box<dyn Fn(&[serde_json::Value]) + Send>;
type CollectionCallback = Box<dyn Fn(ChangeEvent) + Send>;
enum Subscription {
View {
view_name: String,
callback: ViewCallback,
},
Collection {
collection_name: String,
callback: CollectionCallback,
},
}
struct SubscriptionManager {
next_id: AtomicU64,
subs: Mutex<HashMap<u64, Subscription>>,
}
impl SubscriptionManager {
fn new() -> Self {
SubscriptionManager {
next_id: AtomicU64::new(1),
subs: Mutex::new(HashMap::new()),
}
}
fn add_view_sub(&self, view_name: &str, callback: ViewCallback) -> SubscriptionId {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let mut subs = self.subs.lock().unwrap();
subs.insert(
id,
Subscription::View {
view_name: view_name.to_string(),
callback,
},
);
SubscriptionId(id)
}
fn add_collection_sub(&self, collection: &str, callback: CollectionCallback) -> SubscriptionId {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let mut subs = self.subs.lock().unwrap();
subs.insert(
id,
Subscription::Collection {
collection_name: collection.to_string(),
callback,
},
);
SubscriptionId(id)
}
fn remove(&self, id: SubscriptionId) {
let mut subs = self.subs.lock().unwrap();
subs.remove(&id.0);
}
fn notify_view(&self, view_name: &str, data: &[serde_json::Value]) {
let subs = self.subs.lock().unwrap();
for sub in subs.values() {
if let Subscription::View { view_name: vn, callback } = sub {
if vn == view_name {
callback(data);
}
}
}
}
fn notify_collection(&self, collection: &str, event: ChangeEvent) {
let subs = self.subs.lock().unwrap();
for sub in subs.values() {
if let Subscription::Collection { collection_name, callback } = sub {
if collection_name == collection {
callback(event.clone());
}
}
}
}
}
pub struct Store {
root: PathBuf,
schema: SchemaDefinition,
schema_yaml: String,
db: SystemDb,
path_templates: HashMap<String, PathTemplate>,
view_engine: ViewEngine,
subscriptions: Arc<SubscriptionManager>,
_watcher: Mutex<Option<FileWatcher>>,
}
impl Store {
pub fn open(path: &str) -> Result<Self> {
let root = {
let p = PathBuf::from(path);
if p.is_absolute() {
p
} else {
std::env::current_dir()
.map_err(|e| GroundDbError::Other(format!(
"Failed to resolve data directory: {e}"
)))?
.join(p)
}
};
if !root.exists() {
return Err(GroundDbError::Other(format!(
"Data directory does not exist: {}",
root.display()
)));
}
let schema_path = root.join("schema.yaml");
if !schema_path.exists() {
return Err(GroundDbError::Schema(format!(
"schema.yaml not found in {}",
root.display()
)));
}
let schema_yaml = std::fs::read_to_string(&schema_path)?;
let schema = parse_schema(&schema_path)?;
let db_path = root.join("_system.db");
let db = SystemDb::open(&db_path)?;
let mut path_templates = HashMap::new();
for (name, collection) in &schema.collections {
let template = PathTemplate::parse(&collection.path)?;
path_templates.insert(name.clone(), template);
}
let view_engine = ViewEngine::new(&schema)?;
let store = Store {
root,
schema,
schema_yaml,
db,
path_templates,
view_engine,
subscriptions: Arc::new(SubscriptionManager::new()),
_watcher: Mutex::new(None),
};
store.boot()?;
store.view_engine.load_from_db(&store.db)?;
Ok(store)
}
fn boot(&self) -> Result<()> {
let current_hash = hash_schema(&self.schema_yaml);
let last_hash = self.db.get_last_schema_hash()?;
if last_hash.as_deref() != Some(¤t_hash) {
if let Some(old_yaml) = self.db.get_last_schema_yaml()? {
self.run_schema_migration(&old_yaml)?;
}
self.db.record_schema(¤t_hash, &self.schema_yaml)?;
self.full_scan()?;
} else {
self.incremental_scan()?;
}
self.rebuild_all_static_views()?;
Ok(())
}
fn run_schema_migration(&self, old_yaml: &str) -> Result<()> {
use crate::schema::parse_schema_str;
let old_schema = match parse_schema_str(old_yaml) {
Ok(s) => s,
Err(e) => {
log::warn!("Failed to parse old schema for migration: {e}");
return Ok(());
}
};
let migrations = migration::diff_schemas(&old_schema, &self.schema);
if migrations.is_empty() {
return Ok(());
}
let unsafe_migrations = migration::has_unsafe_migrations(&migrations);
for m in &unsafe_migrations {
match m {
migration::SchemaMigration::FieldAdded { required: true, has_default: false, collection, field, .. } => {
return Err(GroundDbError::Schema(format!(
"Migration error: new required field '{}.{}' has no default value",
collection, field
)));
}
migration::SchemaMigration::FieldTypeChanged { collection, field } => {
return Err(GroundDbError::Schema(format!(
"Migration error: field type changed for '{}.{}'",
collection, field
)));
}
other => {
log::warn!("Schema migration warning: {}", other.describe());
}
}
}
for m in &migrations {
match m {
migration::SchemaMigration::CollectionAdded { name } => {
let template = &self.path_templates[name];
let base_dir = self.root.join(template.base_directory());
if !base_dir.exists() {
std::fs::create_dir_all(&base_dir)?;
}
self.db.record_migration(&m.describe())?;
}
migration::SchemaMigration::FieldAdded { collection, field, has_default: true, .. } => {
let field_def = &self.schema.collections[collection].fields[field];
if let Some(default_val) = &field_def.default {
let records = self.db.list_documents(collection)?;
for record in &records {
let mut data = record.parse_data()?;
if let Some(mapping) = data.as_mapping_mut() {
let key = serde_yaml::Value::String(field.clone());
if !mapping.contains_key(&key) {
mapping.insert(key, default_val.clone());
let file_path = self.root.join(&record.path);
let existing_doc = document::read_document(&file_path)?;
document::write_document(&file_path, &data, existing_doc.content.as_deref())?;
let meta = std::fs::metadata(&file_path)?;
let created: chrono::DateTime<chrono::Utc> = meta
.created()
.unwrap_or(meta.modified()?)
.into();
let modified: chrono::DateTime<chrono::Utc> = meta.modified()?.into();
self.db.upsert_document(
&record.id,
&record.collection,
&record.path,
&data,
Some(&created.to_rfc3339()),
Some(&modified.to_rfc3339()),
existing_doc.content.as_deref(),
)?;
}
}
}
}
self.db.record_migration(&m.describe())?;
}
migration::SchemaMigration::EnumValueAdded { .. } => {
self.db.record_migration(&m.describe())?;
}
migration::SchemaMigration::DefaultChanged { .. } => {
self.db.record_migration(&m.describe())?;
}
_ => {
log::info!("Skipping migration: {}", m.describe());
}
}
}
Ok(())
}
fn rebuild_all_static_views(&self) -> Result<()> {
let view_names: Vec<String> = self.schema.views.keys().cloned().collect();
for name in &view_names {
if let Some(parsed) = self.view_engine.get_view(name) {
if !parsed.is_query_template {
self.rebuild_view(name)?;
}
}
}
Ok(())
}
fn full_scan(&self) -> Result<()> {
for (name, _collection) in &self.schema.collections {
self.scan_collection(name)?;
}
Ok(())
}
fn incremental_scan(&self) -> Result<()> {
for (name, _collection) in &self.schema.collections {
let stored_hash = self.db.get_directory_hash(name)?;
let current_hash = self.compute_collection_hash(name)?;
if stored_hash.as_deref() != Some(¤t_hash) {
self.scan_collection(name)?;
}
}
Ok(())
}
fn scan_collection(&self, name: &str) -> Result<()> {
let collection = &self.schema.collections[name];
let template = &self.path_templates[name];
let base_dir = self.root.join(template.base_directory());
if !base_dir.exists() {
std::fs::create_dir_all(&base_dir)?;
self.db
.set_directory_hash(name, &compute_directory_hash(&[]))?;
return Ok(());
}
let ext = collection.file_extension();
let pattern = format!("{}/**/*.{}", base_dir.display(), ext);
let files: Vec<PathBuf> = glob::glob(&pattern)
.map_err(|e| GroundDbError::Other(format!("Glob error: {e}")))?
.filter_map(|r| r.ok())
.collect();
self.db.delete_collection_documents(name)?;
let mut entries = Vec::new();
for file_path in &files {
let doc = document::read_document(file_path)?;
let rel_path = file_path
.strip_prefix(&self.root)
.unwrap_or(file_path)
.to_string_lossy()
.replace('\\', "/");
let created_str = doc.created_at.to_rfc3339();
let modified_str = doc.modified_at.to_rfc3339();
self.db.upsert_document(
&doc.id,
name,
&rel_path,
&doc.data,
Some(&created_str),
Some(&modified_str),
doc.content.as_deref(),
)?;
let mtime = std::fs::metadata(file_path)?
.modified()?
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
entries.push((
file_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
mtime,
));
}
let hash = compute_directory_hash(&entries);
self.db.set_directory_hash(name, &hash)?;
Ok(())
}
fn compute_collection_hash(&self, name: &str) -> Result<String> {
let collection = &self.schema.collections[name];
let template = &self.path_templates[name];
let base_dir = self.root.join(template.base_directory());
if !base_dir.exists() {
return Ok(compute_directory_hash(&[]));
}
let ext = collection.file_extension();
let pattern = format!("{}/**/*.{}", base_dir.display(), ext);
let files: Vec<PathBuf> = glob::glob(&pattern)
.map_err(|e| GroundDbError::Other(format!("Glob error: {e}")))?
.filter_map(|r| r.ok())
.collect();
let mut entries = Vec::new();
for file_path in &files {
let mtime = std::fs::metadata(&file_path)?
.modified()?
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
entries.push((
file_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string(),
mtime,
));
}
Ok(compute_directory_hash(&entries))
}
pub fn collection(&self, name: &str) -> Result<Collection<'_>> {
if !self.schema.collections.contains_key(name) {
return Err(GroundDbError::Other(format!(
"Collection '{name}' not found in schema"
)));
}
Ok(Collection {
store: self,
name: name.to_string(),
})
}
pub fn schema(&self) -> &SchemaDefinition {
&self.schema
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn get_document<T: DeserializeOwned>(
&self,
collection_name: &str,
id: &str,
) -> Result<Document<T>> {
let record = self
.db
.get_document(collection_name, id)?
.ok_or_else(|| GroundDbError::NotFound {
collection: collection_name.to_string(),
id: id.to_string(),
})?;
let file_path = self.root.join(&record.path);
let raw_doc = document::read_document(&file_path)?;
let data: T = serde_yaml::from_value(raw_doc.data)?;
Ok(Document {
id: raw_doc.id,
created_at: raw_doc.created_at,
modified_at: raw_doc.modified_at,
data,
content: raw_doc.content,
})
}
pub fn list_documents<T: DeserializeOwned>(
&self,
collection_name: &str,
) -> Result<Vec<Document<T>>> {
let records = self.db.list_documents(collection_name)?;
let mut docs = Vec::new();
for record in records {
let file_path = self.root.join(&record.path);
if file_path.exists() {
if let Ok(raw_doc) = document::read_document(&file_path) {
if let Ok(data) = serde_yaml::from_value(raw_doc.data) {
docs.push(Document {
id: raw_doc.id,
created_at: raw_doc.created_at,
modified_at: raw_doc.modified_at,
data,
content: raw_doc.content,
});
}
}
}
}
Ok(docs)
}
pub fn insert_document<T: Serialize>(
&self,
collection_name: &str,
data: &T,
content: Option<&str>,
) -> Result<String> {
let json_data = serde_json::to_value(data)?;
self.insert_dynamic(collection_name, json_data, content)
}
pub fn update_document<T: Serialize>(
&self,
collection_name: &str,
id: &str,
data: &T,
) -> Result<()> {
let json_data = serde_json::to_value(data)?;
self.update_dynamic(collection_name, id, json_data)
}
pub fn update_partial_document<T: Serialize>(
&self,
collection_name: &str,
id: &str,
partial: &T,
) -> Result<()> {
let json_data = serde_json::to_value(partial)?;
self.update_partial_dynamic(collection_name, id, json_data)
}
pub fn delete_document(&self, collection_name: &str, id: &str) -> Result<()> {
self.delete_dynamic(collection_name, id)
}
pub fn read_view<T: DeserializeOwned>(&self, view_name: &str) -> Result<Vec<T>> {
let json = self.view_dynamic(view_name)?;
let rows: Vec<T> = serde_json::from_value(json)?;
Ok(rows)
}
pub fn query_view<T: DeserializeOwned, P: Serialize>(
&self,
view_name: &str,
params: &P,
) -> Result<Vec<T>> {
let params_json = serde_json::to_value(params)?;
let params_map = json_to_string_map(¶ms_json);
let json = self.query_dynamic(view_name, ¶ms_map)?;
let rows: Vec<T> = serde_json::from_value(json)?;
Ok(rows)
}
pub fn get_dynamic(
&self,
collection: &str,
id: &str,
) -> Result<serde_json::Value> {
let col = self.collection(collection)?;
let doc = col.get(id)?;
doc_to_json(&doc)
}
pub fn list_dynamic(
&self,
collection: &str,
filters: &HashMap<String, String>,
) -> Result<serde_json::Value> {
let col = self.collection(collection)?;
let docs = col.list()?;
let items: Vec<serde_json::Value> = docs
.iter()
.filter_map(|doc| doc_to_json(doc).ok())
.filter(|json| {
filters.iter().all(|(key, value)| {
match json.get(key) {
Some(serde_json::Value::String(s)) => s == value,
Some(serde_json::Value::Number(n)) => &n.to_string() == value,
Some(serde_json::Value::Bool(b)) => &b.to_string() == value,
_ => false,
}
})
})
.collect();
Ok(serde_json::Value::Array(items))
}
pub fn insert_dynamic(
&self,
collection: &str,
data: serde_json::Value,
content: Option<&str>,
) -> Result<String> {
let col = self.collection(collection)?;
let yaml_data = json_value_to_yaml(&data);
col.insert(yaml_data, content)
}
pub fn update_dynamic(
&self,
collection: &str,
id: &str,
data: serde_json::Value,
) -> Result<()> {
let col = self.collection(collection)?;
let yaml_data = json_value_to_yaml(&data);
col.update(id, yaml_data, None)
}
pub fn update_partial_dynamic(
&self,
collection: &str,
id: &str,
partial_data: serde_json::Value,
) -> Result<()> {
let col = self.collection(collection)?;
let yaml_data = json_value_to_yaml(&partial_data);
col.update_partial(id, yaml_data, None)
}
pub fn delete_dynamic(&self, collection: &str, id: &str) -> Result<()> {
let col = self.collection(collection)?;
col.delete(id)
}
pub fn view_dynamic(&self, name: &str) -> Result<serde_json::Value> {
if !self.schema.views.contains_key(name) {
return Err(GroundDbError::NotFound {
collection: "views".to_string(),
id: name.to_string(),
});
}
if let Some(data) = self.view_engine.get_view_data(name) {
return Ok(serde_json::Value::Array(data));
}
if let Some(json_str) = self.db.get_view_data(name)? {
let val: serde_json::Value = serde_json::from_str(&json_str)?;
return Ok(val);
}
Ok(serde_json::Value::Array(vec![]))
}
pub fn query_dynamic(
&self,
name: &str,
params: &HashMap<String, String>,
) -> Result<serde_json::Value> {
if !self.schema.views.contains_key(name) {
return Err(GroundDbError::NotFound {
collection: "views".to_string(),
id: name.to_string(),
});
}
let parsed = match self.view_engine.get_view(name) {
Some(p) => p.clone(),
None => return Ok(serde_json::Value::Array(vec![])),
};
let rewritten = view_engine::rewrite_view_sql(&parsed, &self.schema)?;
let results = self.db.query_documents_sql(&rewritten.sql, params)?;
Ok(serde_json::Value::Array(results))
}
pub fn migrate(&self, dry_run: bool) -> Result<serde_json::Value> {
use crate::schema::parse_schema_str;
let old_yaml = self.db.get_last_schema_yaml()?;
if old_yaml.is_none() {
return Ok(serde_json::json!({
"message": "No previous schema found. This is the first schema version.",
"migrations": []
}));
}
let old_yaml = old_yaml.unwrap();
let old_schema = match parse_schema_str(&old_yaml) {
Ok(s) => s,
Err(e) => {
return Ok(serde_json::json!({
"error": format!("Failed to parse old schema: {e}"),
"migrations": []
}));
}
};
let migrations = migration::diff_schemas(&old_schema, &self.schema);
let descriptions: Vec<serde_json::Value> = migrations
.iter()
.map(|m| {
serde_json::json!({
"description": m.describe(),
"safe": m.is_safe()
})
})
.collect();
if dry_run {
Ok(serde_json::json!({
"dry_run": true,
"migration_count": migrations.len(),
"migrations": descriptions
}))
} else {
self.run_schema_migration(&old_yaml)?;
Ok(serde_json::json!({
"ok": true,
"applied": migrations.len(),
"migrations": descriptions
}))
}
}
pub fn explain_view(&self, name: &str) -> Result<serde_json::Value> {
let parsed = self
.view_engine
.get_view(name)
.ok_or_else(|| GroundDbError::NotFound {
collection: "views".to_string(),
id: name.to_string(),
})?
.clone();
let rewritten = view_engine::rewrite_view_sql(&parsed, &self.schema)?;
let ref_collections = parsed.referenced_collections();
let collections: Vec<&str> = ref_collections
.iter()
.map(|s| s.as_str())
.collect();
Ok(serde_json::json!({
"view": name,
"original_sql": parsed.original_sql.trim(),
"rewritten_sql": rewritten.sql,
"collections": collections,
"limit": rewritten.original_limit,
"buffer_limit": rewritten.buffer_limit,
"is_query_template": parsed.is_query_template,
"param_names": rewritten.param_names,
}))
}
pub fn validate_all(&self) -> Result<serde_json::Value> {
let mut results = serde_json::Map::new();
for (name, collection_def) in &self.schema.collections {
let col = self.collection(name)?;
let docs = col.list()?;
let mut col_results = Vec::new();
for doc in &docs {
let vr = validation::validate_document(&self.schema, collection_def, &doc.data);
if !vr.is_ok() || vr.has_warnings() {
let mut entry = serde_json::Map::new();
entry.insert("id".into(), serde_json::Value::String(doc.id.clone()));
if !vr.errors.is_empty() {
entry.insert(
"errors".into(),
serde_json::Value::Array(
vr.errors.iter().map(|e| serde_json::Value::String(e.clone())).collect(),
),
);
}
if !vr.warnings.is_empty() {
entry.insert(
"warnings".into(),
serde_json::Value::Array(
vr.warnings.iter().map(|w| serde_json::Value::String(w.clone())).collect(),
),
);
}
col_results.push(serde_json::Value::Object(entry));
}
}
results.insert(
name.clone(),
serde_json::json!({
"total": docs.len(),
"issues": col_results,
}),
);
}
Ok(serde_json::Value::Object(results))
}
pub fn status(&self) -> Result<serde_json::Value> {
let schema_hash = hash_schema(&self.schema_yaml);
let mut collections = serde_json::Map::new();
for name in self.schema.collections.keys() {
let docs = self.db.list_documents(name)?;
collections.insert(
name.clone(),
serde_json::json!({ "count": docs.len() }),
);
}
Ok(serde_json::json!({
"schema_hash": schema_hash,
"collections": collections,
"views": self.schema.views.keys().collect::<Vec<_>>(),
}))
}
pub fn batch(&self) -> Batch<'_> {
Batch {
store: self,
ops: Vec::new(),
}
}
pub fn rebuild(&self, collection: Option<&str>) -> Result<()> {
match collection {
Some(name) => {
self.scan_collection(name)?;
let affected = self.view_engine.affected_views(name);
for view_name in affected {
if let Some(parsed) = self.view_engine.get_view(view_name) {
if !parsed.is_query_template {
self.rebuild_view(view_name)?;
}
}
}
Ok(())
}
None => {
self.full_scan()?;
self.rebuild_all_static_views()
}
}
}
pub fn on_view_change(
&self,
view_name: &str,
callback: Box<dyn Fn(&[serde_json::Value]) + Send>,
) -> SubscriptionId {
self.subscriptions.add_view_sub(view_name, callback)
}
pub fn on_collection_change(
&self,
collection: &str,
callback: Box<dyn Fn(ChangeEvent) + Send>,
) -> SubscriptionId {
self.subscriptions.add_collection_sub(collection, callback)
}
pub fn unsubscribe(&self, id: SubscriptionId) {
self.subscriptions.remove(id);
}
pub fn watch(&self) -> Result<()> {
let dirs: Vec<PathBuf> = self
.path_templates
.values()
.map(|t| PathBuf::from(t.base_directory()))
.collect();
let watcher = FileWatcher::start(&self.root, &dirs)
.map_err(|e| GroundDbError::Other(format!("Failed to start file watcher: {e}")))?;
let mut guard = self._watcher.lock().unwrap();
*guard = Some(watcher);
Ok(())
}
pub fn process_watcher_events(&self) -> Result<()> {
let guard = self._watcher.lock().unwrap();
let watcher = match guard.as_ref() {
Some(w) => w,
None => return Ok(()),
};
let mut events = Vec::new();
while let Ok(event) = watcher.event_rx.try_recv() {
events.push(event);
}
drop(guard);
if events.is_empty() {
return Ok(());
}
let mut affected_collections = std::collections::HashSet::new();
for event in &events {
if let Some(collection_name) = self.collection_for_path(&event.path) {
affected_collections.insert(collection_name.clone());
self.process_single_watcher_event(&collection_name, event)?;
}
}
for collection_name in &affected_collections {
let hash = self.compute_collection_hash(collection_name)?;
self.db.set_directory_hash(collection_name, &hash)?;
let affected_views = self.view_engine.affected_views(collection_name);
for view_name in affected_views {
if let Some(parsed) = self.view_engine.get_view(view_name) {
if !parsed.is_query_template {
self.rebuild_view(view_name)?;
}
}
}
}
Ok(())
}
fn collection_for_path(&self, path: &Path) -> Option<String> {
let rel = path.strip_prefix(&self.root).ok()?;
let rel_str = rel.to_string_lossy().replace('\\', "/");
for (name, template) in &self.path_templates {
let base = template.base_directory();
if rel_str.starts_with(&base) {
return Some(name.clone());
}
}
None
}
fn process_single_watcher_event(
&self,
collection_name: &str,
event: &WatcherEvent,
) -> Result<()> {
let rel_path = event
.path
.strip_prefix(&self.root)
.unwrap_or(&event.path)
.to_string_lossy()
.replace('\\', "/");
match event.kind {
ChangeKind::Created | ChangeKind::Modified => {
if event.path.exists() {
let mut doc = document::read_document(&event.path)?;
if let Some(template) = self.path_templates.get(collection_name) {
if let Some(extracted) = template.extract(&rel_path) {
let col_def = self.schema.collections.get(collection_name);
let mut changed = false;
for segment in &template.segments {
let (field_name, has_format) = match segment {
PathSegment::Field { name, format } => (name, format.is_some()),
_ => continue,
};
if field_name == "id" || has_format {
continue;
}
let path_value = match extracted.get(field_name) {
Some(v) => v,
None => continue,
};
let current_slug = doc.data
.as_mapping()
.and_then(|m| m.get(serde_yaml::Value::String(field_name.clone())))
.and_then(|v| v.as_str())
.map(path_template::slugify);
if current_slug.as_deref() == Some(path_value) {
continue; }
let new_value = col_def
.and_then(|c| c.fields.get(field_name))
.and_then(|f| f.enum_values.as_ref())
.and_then(|variants| {
variants.iter().find(|v| path_template::slugify(v) == *path_value)
})
.cloned()
.unwrap_or_else(|| path_value.clone());
if let Some(map) = doc.data.as_mapping_mut() {
map.insert(
serde_yaml::Value::String(field_name.clone()),
serde_yaml::Value::String(new_value),
);
changed = true;
}
}
if changed {
document::write_document(
&event.path,
&doc.data,
doc.content.as_deref(),
)?;
}
}
}
let created_str = doc.created_at.to_rfc3339();
let modified_str = doc.modified_at.to_rfc3339();
self.db.upsert_document(
&doc.id,
collection_name,
&rel_path,
&doc.data,
Some(&created_str),
Some(&modified_str),
doc.content.as_deref(),
)?;
let change = if event.kind == ChangeKind::Created {
let json_data = serde_json::to_value(&doc.data)?;
ChangeEvent::Inserted {
id: doc.id,
data: json_data,
}
} else {
let json_data = serde_json::to_value(&doc.data)?;
ChangeEvent::Updated {
id: doc.id,
data: json_data,
}
};
self.subscriptions.notify_collection(collection_name, change);
} else {
let id = event
.path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if !id.is_empty() {
self.db.delete_document(collection_name, &id)?;
self.subscriptions.notify_collection(
collection_name,
ChangeEvent::Deleted { id },
);
}
}
}
ChangeKind::Deleted => {
let id = event
.path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
if !id.is_empty() {
self.db.delete_document(collection_name, &id)?;
self.subscriptions.notify_collection(
collection_name,
ChangeEvent::Deleted { id },
);
}
}
}
Ok(())
}
fn post_write(&self, collection_name: &str) -> Result<()> {
let hash = self.compute_collection_hash(collection_name)?;
self.db.set_directory_hash(collection_name, &hash)?;
let affected = self.view_engine.affected_views(collection_name);
for view_name in affected {
if let Some(parsed) = self.view_engine.get_view(view_name) {
if !parsed.is_query_template {
self.rebuild_view(view_name)?;
}
}
}
Ok(())
}
fn rebuild_view(&self, view_name: &str) -> Result<()> {
let parsed = match self.view_engine.get_view(view_name) {
Some(p) => p.clone(),
None => return Ok(()),
};
let rewritten = view_engine::rewrite_view_sql(&parsed, &self.schema)?;
let exec_sql = if let Some(buffer_limit) = rewritten.buffer_limit {
let base = strip_limit(&rewritten.sql);
format!("{base} LIMIT {buffer_limit}")
} else {
rewritten.sql.clone()
};
let empty_params = HashMap::new();
let rows = self.db.query_documents_sql(&exec_sql, &empty_params)?;
let json_str = serde_json::to_string(&rows)?;
self.db.set_view_data(view_name, &json_str)?;
self.view_engine.set_view_data(view_name, rows.clone());
self.subscriptions.notify_view(view_name, &rows);
if parsed.materialize {
self.view_engine.materialize_view(&self.root, view_name)?;
}
Ok(())
}
}
enum BatchOp {
Insert {
collection: String,
data: serde_json::Value,
content: Option<String>,
},
Update {
collection: String,
id: String,
data: serde_json::Value,
},
Delete {
collection: String,
id: String,
},
}
pub struct Batch<'a> {
store: &'a Store,
ops: Vec<BatchOp>,
}
pub struct BatchCollection<'a, 'b> {
batch: &'b mut Batch<'a>,
collection: String,
}
impl<'a> Batch<'a> {
pub fn collection(&mut self, name: &str) -> BatchCollection<'a, '_> {
BatchCollection {
batch: self,
collection: name.to_string(),
}
}
pub fn execute(self) -> Result<Vec<String>> {
let mut created_files: Vec<PathBuf> = Vec::new();
let mut saved_files: Vec<(PathBuf, Vec<u8>)> = Vec::new();
let mut results: Vec<String> = Vec::new();
self.store.db.begin_transaction()?;
for op in &self.ops {
let res = match op {
BatchOp::Insert { collection, data, content } => {
self.store
.insert_dynamic(collection, data.clone(), content.as_deref())
.map(|id| {
results.push(id.clone());
if let Ok(Some(record)) = self.store.db.get_document(collection, &id) {
created_files.push(self.store.root.join(&record.path));
}
})
}
BatchOp::Update { collection, id, data } => {
if let Ok(Some(record)) = self.store.db.get_document(collection, id) {
let file_path = self.store.root.join(&record.path);
if let Ok(content) = std::fs::read(&file_path) {
saved_files.push((file_path, content));
}
}
self.store
.update_dynamic(collection, id, data.clone())
.map(|_| {
results.push(id.clone());
})
}
BatchOp::Delete { collection, id } => {
if let Ok(Some(record)) = self.store.db.get_document(collection, id) {
let file_path = self.store.root.join(&record.path);
if let Ok(content) = std::fs::read(&file_path) {
saved_files.push((file_path, content));
}
}
self.store
.delete_dynamic(collection, id)
.map(|_| {
results.push(id.clone());
})
}
};
if let Err(e) = res {
for path in &created_files {
let _ = std::fs::remove_file(path);
}
for (path, content) in &saved_files {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(path, content);
}
self.store.db.rollback_transaction()?;
return Err(e);
}
}
self.store.db.commit_transaction()?;
Ok(results)
}
}
impl<'a, 'b> BatchCollection<'a, 'b> {
pub fn insert(&mut self, data: serde_json::Value, content: Option<&str>) -> &mut Self {
self.batch.ops.push(BatchOp::Insert {
collection: self.collection.clone(),
data,
content: content.map(|s| s.to_string()),
});
self
}
pub fn update(&mut self, id: &str, data: serde_json::Value) -> &mut Self {
self.batch.ops.push(BatchOp::Update {
collection: self.collection.clone(),
id: id.to_string(),
data,
});
self
}
pub fn delete(&mut self, id: &str) -> &mut Self {
self.batch.ops.push(BatchOp::Delete {
collection: self.collection.clone(),
id: id.to_string(),
});
self
}
}
pub struct Collection<'a> {
store: &'a Store,
name: String,
}
impl<'a> Collection<'a> {
fn definition(&self) -> &CollectionDefinition {
&self.store.schema.collections[&self.name]
}
fn template(&self) -> &PathTemplate {
&self.store.path_templates[&self.name]
}
pub fn get(&self, id: &str) -> Result<Document<serde_yaml::Value>> {
let record = self
.store
.db
.get_document(&self.name, id)?
.ok_or_else(|| GroundDbError::NotFound {
collection: self.name.clone(),
id: id.to_string(),
})?;
let file_path = self.store.root.join(&record.path);
document::read_document(&file_path)
}
pub fn list(&self) -> Result<Vec<Document<serde_yaml::Value>>> {
let records = self.store.db.list_documents(&self.name)?;
let mut docs = Vec::new();
for record in &records {
let file_path = self.store.root.join(&record.path);
if file_path.exists() {
match document::read_document(&file_path) {
Ok(doc) => docs.push(doc),
Err(e) => {
log::warn!("Failed to read document {}: {}", record.path, e);
}
}
}
}
Ok(docs)
}
pub fn insert(
&self,
mut data: serde_yaml::Value,
content: Option<&str>,
) -> Result<String> {
let definition = self.definition();
if definition.readonly {
return Err(GroundDbError::Other(format!(
"Collection '{}' is readonly",
self.name
)));
}
validation::validate_and_prepare(&self.store.schema, definition, &mut data)?;
let id = self.determine_id(&data)?;
let template = self.template();
let rel_path = template.render(&data, Some(&id))?;
let abs_path = self.store.root.join(&rel_path);
if abs_path.exists() {
match definition.on_conflict() {
OnConflict::Error => {
return Err(GroundDbError::PathConflict { path: rel_path });
}
OnConflict::Suffix => {
let resolved = path_template::resolve_suffix(&rel_path, |p| {
self.store.root.join(p).exists()
});
let abs_resolved = self.store.root.join(&resolved);
document::write_document(&abs_resolved, &data, content)?;
let resolved_id = Path::new(&resolved)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or(&id)
.to_string();
let meta = std::fs::metadata(&abs_resolved)?;
let created: chrono::DateTime<chrono::Utc> = meta
.created()
.unwrap_or(meta.modified()?)
.into();
let modified: chrono::DateTime<chrono::Utc> = meta.modified()?.into();
self.store.db.upsert_document(
&resolved_id,
&self.name,
&resolved,
&data,
Some(&created.to_rfc3339()),
Some(&modified.to_rfc3339()),
content,
)?;
self.store.post_write(&self.name)?;
self.store.subscriptions.notify_collection(
&self.name,
ChangeEvent::Inserted {
id: resolved_id.clone(),
data: serde_json::to_value(&data)?,
},
);
return Ok(resolved_id);
}
}
}
document::write_document(&abs_path, &data, content)?;
let meta = std::fs::metadata(&abs_path)?;
let created: chrono::DateTime<chrono::Utc> = meta
.created()
.unwrap_or(meta.modified()?)
.into();
let modified: chrono::DateTime<chrono::Utc> = meta.modified()?.into();
self.store.db.upsert_document(
&id,
&self.name,
&rel_path,
&data,
Some(&created.to_rfc3339()),
Some(&modified.to_rfc3339()),
content,
)?;
self.store.post_write(&self.name)?;
self.store.subscriptions.notify_collection(
&self.name,
ChangeEvent::Inserted {
id: id.clone(),
data: serde_json::to_value(&data)?,
},
);
Ok(id)
}
pub fn update(
&self,
id: &str,
mut data: serde_yaml::Value,
content: Option<&str>,
) -> Result<()> {
let definition = self.definition();
if definition.readonly {
return Err(GroundDbError::Other(format!(
"Collection '{}' is readonly",
self.name
)));
}
let record = self
.store
.db
.get_document(&self.name, id)?
.ok_or_else(|| GroundDbError::NotFound {
collection: self.name.clone(),
id: id.to_string(),
})?;
validation::validate_and_prepare(&self.store.schema, definition, &mut data)?;
let template = self.template();
let new_rel_path = template.render(&data, Some(id))?;
let old_abs_path = self.store.root.join(&record.path);
let new_abs_path = self.store.root.join(&new_rel_path);
if record.path != new_rel_path {
document::write_document(&new_abs_path, &data, content)?;
if old_abs_path.exists() {
document::delete_document(&old_abs_path)?;
}
} else {
document::write_document(&new_abs_path, &data, content)?;
}
let meta = std::fs::metadata(&new_abs_path)?;
let created: chrono::DateTime<chrono::Utc> = meta
.created()
.unwrap_or(meta.modified()?)
.into();
let modified: chrono::DateTime<chrono::Utc> = meta.modified()?.into();
self.store.db.upsert_document(
id,
&self.name,
&new_rel_path,
&data,
Some(&created.to_rfc3339()),
Some(&modified.to_rfc3339()),
content,
)?;
self.store.post_write(&self.name)?;
self.store.subscriptions.notify_collection(
&self.name,
ChangeEvent::Updated {
id: id.to_string(),
data: serde_json::to_value(&data)?,
},
);
Ok(())
}
pub fn update_partial(
&self,
id: &str,
partial: serde_yaml::Value,
content: Option<&str>,
) -> Result<()> {
let existing = self.get(id)?;
let mut merged = existing.data;
if let (Some(base_map), Some(partial_map)) =
(merged.as_mapping_mut(), partial.as_mapping())
{
for (key, value) in partial_map {
if *value != serde_yaml::Value::Null {
base_map.insert(key.clone(), value.clone());
}
}
}
let effective_content = content.or(existing.content.as_deref());
self.update(id, merged, effective_content)
}
pub fn delete(&self, id: &str) -> Result<()> {
let definition = self.definition();
if definition.readonly {
return Err(GroundDbError::Other(format!(
"Collection '{}' is readonly",
self.name
)));
}
let record = self
.store
.db
.get_document(&self.name, id)?
.ok_or_else(|| GroundDbError::NotFound {
collection: self.name.clone(),
id: id.to_string(),
})?;
self.check_referential_integrity(id)?;
let abs_path = self.store.root.join(&record.path);
if abs_path.exists() {
document::delete_document(&abs_path)?;
}
self.store.db.delete_document(&self.name, id)?;
self.store.post_write(&self.name)?;
self.store.subscriptions.notify_collection(
&self.name,
ChangeEvent::Deleted {
id: id.to_string(),
},
);
Ok(())
}
fn check_referential_integrity(&self, id: &str) -> Result<()> {
let refs = self.store.db.find_references(&self.name, id)?;
if refs.is_empty() {
return Ok(());
}
for ref_doc in &refs {
if let Some(ref_collection) = self.store.schema.collections.get(&ref_doc.collection) {
for (field_name, field_def) in &ref_collection.fields {
if field_def.field_type == FieldType::Ref {
if let Some(target) = &field_def.target {
if target.targets().contains(&self.name.as_str()) {
let policy = field_def
.effective_on_delete(ref_collection.on_delete.as_ref());
let data = ref_doc.parse_data()?;
if let Some(val) = data.get(field_name) {
let ref_id = match val {
serde_yaml::Value::String(s) => Some(s.as_str()),
serde_yaml::Value::Mapping(m) => m
.get(&serde_yaml::Value::String("id".into()))
.and_then(|v| v.as_str()),
_ => None,
};
if ref_id == Some(id) {
match policy {
OnDeletePolicy::Error => {
return Err(GroundDbError::ReferentialIntegrity(
format!(
"Cannot delete {}/{}: referenced by {}/{} (field '{}')",
self.name, id, ref_doc.collection, ref_doc.id, field_name
),
));
}
OnDeletePolicy::Cascade => {
let ref_col =
self.store.collection(&ref_doc.collection)?;
ref_col.delete(&ref_doc.id)?;
}
OnDeletePolicy::Nullify => {
let mut data = ref_doc.parse_data()?;
if let Some(mapping) = data.as_mapping_mut() {
mapping.insert(
serde_yaml::Value::String(
field_name.clone(),
),
serde_yaml::Value::Null,
);
}
let file_path =
self.store.root.join(&ref_doc.path);
let existing_doc = document::read_document(&file_path)?;
document::write_document(
&file_path, &data, existing_doc.content.as_deref(),
)?;
let meta = std::fs::metadata(&file_path)?;
let created: chrono::DateTime<chrono::Utc> = meta
.created()
.unwrap_or(meta.modified()?)
.into();
let modified: chrono::DateTime<chrono::Utc> = meta.modified()?.into();
self.store.db.upsert_document(
&ref_doc.id,
&ref_doc.collection,
&ref_doc.path,
&data,
Some(&created.to_rfc3339()),
Some(&modified.to_rfc3339()),
existing_doc.content.as_deref(),
)?;
}
OnDeletePolicy::Archive => {
let old_path =
self.store.root.join(&ref_doc.path);
let archive_path = self
.store
.root
.join("_archive")
.join(&ref_doc.path);
document::move_document(&old_path, &archive_path)?;
self.store
.db
.delete_document(
&ref_doc.collection,
&ref_doc.id,
)?;
}
}
}
}
}
}
}
}
}
}
Ok(())
}
fn determine_id(&self, data: &serde_yaml::Value) -> Result<String> {
let definition = self.definition();
if let Some(strategy) = definition.auto_id() {
return Ok(match strategy {
AutoIdStrategy::Ulid => ulid::Ulid::new().to_string().to_lowercase(),
AutoIdStrategy::Uuid => uuid::Uuid::new_v4().to_string(),
AutoIdStrategy::Nanoid => nanoid::nanoid!(),
});
}
let template = self.template();
let rendered = template.render(data, None)?;
let id = Path::new(&rendered)
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| {
GroundDbError::Other(format!(
"Cannot extract ID from rendered path: {rendered}"
))
})?
.to_string();
Ok(id)
}
}
fn doc_to_json(doc: &Document<serde_yaml::Value>) -> Result<serde_json::Value> {
let data_json = serde_json::to_value(&doc.data)?;
let mut obj = serde_json::Map::new();
obj.insert("id".into(), serde_json::Value::String(doc.id.clone()));
obj.insert(
"created_at".into(),
serde_json::Value::String(doc.created_at.to_rfc3339()),
);
obj.insert(
"modified_at".into(),
serde_json::Value::String(doc.modified_at.to_rfc3339()),
);
if let serde_json::Value::Object(fields) = data_json {
for (k, v) in fields {
obj.insert(k, v);
}
}
if let Some(content) = &doc.content {
obj.insert("content".into(), serde_json::Value::String(content.clone()));
}
Ok(serde_json::Value::Object(obj))
}
fn strip_limit(sql: &str) -> String {
let trimmed = sql.trim().trim_end_matches(';').trim();
let upper = trimmed.to_uppercase();
for candidate in find_all_positions(&upper, "LIMIT ") {
if candidate == 0 {
continue;
}
let before = trimmed.as_bytes()[candidate - 1];
if before == b' ' || before == b'\n' || before == b'\r' || before == b'\t' {
let after_limit = &trimmed[candidate + 6..].trim();
if after_limit.chars().all(|c| c.is_ascii_digit() || c.is_whitespace()) {
return trimmed[..candidate - 1].trim_end().to_string();
}
}
}
trimmed.to_string()
}
fn find_all_positions(haystack: &str, needle: &str) -> Vec<usize> {
let mut positions = Vec::new();
let mut start = 0;
while let Some(pos) = haystack[start..].find(needle) {
positions.push(start + pos);
start += pos + 1;
}
positions.reverse();
positions
}
fn json_to_string_map(json: &serde_json::Value) -> HashMap<String, String> {
let mut map = HashMap::new();
if let Some(obj) = json.as_object() {
for (k, v) in obj {
let s = match v {
serde_json::Value::String(s) => s.clone(),
serde_json::Value::Number(n) => n.to_string(),
serde_json::Value::Bool(b) => b.to_string(),
_ => v.to_string(),
};
map.insert(k.clone(), s);
}
}
map
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn setup_test_store() -> (TempDir, Store) {
let tmp = TempDir::new().unwrap();
let schema = r#"
collections:
users:
path: "users/{name}.md"
fields:
name: { type: string, required: true }
email: { type: string, required: true }
role: { type: string, enum: [admin, member, guest], default: member }
additional_properties: false
strict: true
on_delete: error
posts:
path: "posts/{status}/{date:YYYY-MM-DD}-{title}.md"
id: { on_conflict: suffix }
fields:
title: { type: string, required: true }
author_id: { type: ref, target: users, required: true, on_delete: cascade }
date: { type: date, required: true }
tags: { type: list, items: string }
status: { type: string, enum: [draft, published, archived], default: draft }
content: true
additional_properties: false
strict: true
events:
path: "events/{id}.md"
id: { auto: ulid }
fields:
type: { type: string, required: true }
payload: { type: object }
additional_properties: true
strict: false
"#;
std::fs::write(tmp.path().join("schema.yaml"), schema).unwrap();
std::fs::create_dir_all(tmp.path().join("users")).unwrap();
std::fs::create_dir_all(tmp.path().join("posts")).unwrap();
std::fs::create_dir_all(tmp.path().join("events")).unwrap();
let store = Store::open(tmp.path().to_str().unwrap()).unwrap();
(tmp, store)
}
#[test]
fn test_open_store() {
let (_tmp, store) = setup_test_store();
assert_eq!(store.schema().collections.len(), 3);
}
#[test]
fn test_insert_and_get_user() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice Chen\nemail: alice@test.com").unwrap();
let id = users.insert(data, None).unwrap();
assert_eq!(id, "alice-chen");
let doc = users.get("alice-chen").unwrap();
assert_eq!(doc.id, "alice-chen");
assert_eq!(
doc.data["name"],
serde_yaml::Value::String("Alice Chen".into())
);
assert_eq!(
doc.data["role"],
serde_yaml::Value::String("member".into())
);
}
#[test]
fn test_insert_and_list() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data1: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
let data2: serde_yaml::Value =
serde_yaml::from_str("name: Bob\nemail: bob@test.com").unwrap();
users.insert(data1, None).unwrap();
users.insert(data2, None).unwrap();
let docs = users.list().unwrap();
assert_eq!(docs.len(), 2);
}
#[test]
fn test_insert_post_with_content() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(user_data, None).unwrap();
let posts = store.collection("posts").unwrap();
let post_data: serde_yaml::Value = serde_yaml::from_str(
"title: Hello World\nauthor_id: alice\ndate: '2026-02-13'\nstatus: published",
)
.unwrap();
let id = posts
.insert(post_data, Some("## Hello\n\nThis is my post."))
.unwrap();
let doc = posts.get(&id).unwrap();
assert_eq!(
doc.data["title"],
serde_yaml::Value::String("Hello World".into())
);
assert!(doc.content.unwrap().contains("This is my post."));
}
#[test]
fn test_update_causes_file_movement() {
let (tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(user_data, None).unwrap();
let posts = store.collection("posts").unwrap();
let post_data: serde_yaml::Value = serde_yaml::from_str(
"title: My Post\nauthor_id: alice\ndate: '2026-02-13'\nstatus: draft",
)
.unwrap();
let id = posts.insert(post_data, Some("Body")).unwrap();
let draft_path = tmp.path().join("posts/draft/2026-02-13-my-post.md");
assert!(draft_path.exists(), "Draft file should exist");
let updated_data: serde_yaml::Value = serde_yaml::from_str(
"title: My Post\nauthor_id: alice\ndate: '2026-02-13'\nstatus: published",
)
.unwrap();
posts.update(&id, updated_data, Some("Body")).unwrap();
assert!(!draft_path.exists(), "Draft file should be gone");
let published_path = tmp.path().join("posts/published/2026-02-13-my-post.md");
assert!(published_path.exists(), "Published file should exist");
}
#[test]
fn test_delete_user() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
users.delete("alice").unwrap();
let result = users.get("alice");
assert!(result.is_err());
}
#[test]
fn test_referential_integrity_cascade() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(user_data, None).unwrap();
let posts = store.collection("posts").unwrap();
let post_data: serde_yaml::Value = serde_yaml::from_str(
"title: Test Post\nauthor_id: alice\ndate: '2026-02-13'\nstatus: draft",
)
.unwrap();
posts.insert(post_data, Some("Body")).unwrap();
users.delete("alice").unwrap();
let post_list = posts.list().unwrap();
assert_eq!(post_list.len(), 0);
}
#[test]
fn test_auto_id_generation() {
let (_tmp, store) = setup_test_store();
let events = store.collection("events").unwrap();
let data: serde_yaml::Value = serde_yaml::from_str("type: click").unwrap();
let id = events.insert(data, None).unwrap();
assert!(!id.is_empty());
let doc = events.get(&id).unwrap();
assert_eq!(
doc.data["type"],
serde_yaml::Value::String("click".into())
);
}
#[test]
fn test_validation_rejects_invalid() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value = serde_yaml::from_str("name: Alice").unwrap();
let result = users.insert(data, None);
assert!(result.is_err());
}
#[test]
fn test_path_conflict_suffix() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(user_data, None).unwrap();
let posts = store.collection("posts").unwrap();
let post_data: serde_yaml::Value = serde_yaml::from_str(
"title: Hello\nauthor_id: alice\ndate: '2026-02-13'\nstatus: draft",
)
.unwrap();
let id1 = posts.insert(post_data.clone(), Some("Body 1")).unwrap();
let id2 = posts.insert(post_data, Some("Body 2")).unwrap();
assert_ne!(id1, id2);
}
#[test]
fn test_collection_not_found() {
let (_tmp, store) = setup_test_store();
let result = store.collection("nonexistent");
assert!(result.is_err());
}
#[test]
fn test_dynamic_api() {
let (_tmp, store) = setup_test_store();
let data = serde_json::json!({
"name": "Alice",
"email": "alice@test.com"
});
let id = store.insert_dynamic("users", data, None).unwrap();
assert_eq!(id, "alice");
let doc = store.get_dynamic("users", "alice").unwrap();
assert_eq!(doc["id"], "alice");
assert_eq!(doc["name"], "Alice");
assert_eq!(doc["email"], "alice@test.com");
assert!(doc["created_at"].is_string());
let list = store
.list_dynamic("users", &HashMap::new())
.unwrap();
assert_eq!(list.as_array().unwrap().len(), 1);
store.delete_dynamic("users", "alice").unwrap();
let list = store
.list_dynamic("users", &HashMap::new())
.unwrap();
assert_eq!(list.as_array().unwrap().len(), 0);
}
#[test]
fn test_status() {
let (_tmp, store) = setup_test_store();
let status = store.status().unwrap();
assert!(status["schema_hash"].is_string());
assert!(status["collections"].is_object());
}
#[test]
fn test_validate_all() {
let (_tmp, store) = setup_test_store();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
store.collection("users").unwrap().insert(data, None).unwrap();
let report = store.validate_all().unwrap();
assert!(report["users"]["total"].as_u64().unwrap() >= 1);
}
#[test]
fn test_update_partial() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com\nrole: member").unwrap();
users.insert(data, None).unwrap();
let partial: serde_yaml::Value =
serde_yaml::from_str("email: alice@newdomain.com").unwrap();
users.update_partial("alice", partial, None).unwrap();
let doc = users.get("alice").unwrap();
assert_eq!(
doc.data["email"],
serde_yaml::Value::String("alice@newdomain.com".into())
);
assert_eq!(
doc.data["name"],
serde_yaml::Value::String("Alice".into())
);
assert_eq!(
doc.data["role"],
serde_yaml::Value::String("member".into())
);
}
#[test]
fn test_directory_hash_updated_on_write() {
let (_tmp, store) = setup_test_store();
let hash_before = store.db.get_directory_hash("users").unwrap();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
let hash_after = store.db.get_directory_hash("users").unwrap();
assert_ne!(hash_before, hash_after);
}
#[test]
fn test_batch_insert() {
let (_tmp, store) = setup_test_store();
let mut batch = store.batch();
batch.collection("users").insert(
serde_json::json!({ "name": "Alice", "email": "a@test.com" }),
None,
);
batch.collection("users").insert(
serde_json::json!({ "name": "Bob", "email": "b@test.com" }),
None,
);
let results = batch.execute().unwrap();
assert_eq!(results.len(), 2);
let users = store.collection("users").unwrap();
let all = users.list().unwrap();
assert_eq!(all.len(), 2);
}
#[test]
fn test_batch_rollback_on_failure() {
let (_tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
let mut batch = store.batch();
batch.collection("users").insert(
serde_json::json!({ "name": "Bob", "email": "b@test.com" }),
None,
);
batch.collection("users").insert(
serde_json::json!({ "name": "Charlie" }),
None,
);
let result = batch.execute();
assert!(result.is_err());
let all = store.collection("users").unwrap().list().unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].id, "alice");
}
fn setup_store_with_views() -> (TempDir, Store) {
let tmp = TempDir::new().unwrap();
let schema = r#"
collections:
users:
path: "users/{name}.md"
fields:
name: { type: string, required: true }
email: { type: string, required: true }
role: { type: string, enum: [admin, member, guest], default: member }
additional_properties: false
strict: true
on_delete: error
posts:
path: "posts/{status}/{date:YYYY-MM-DD}-{title}.md"
id: { on_conflict: suffix }
fields:
title: { type: string, required: true }
author_id: { type: ref, target: users, required: true, on_delete: cascade }
date: { type: date, required: true }
tags: { type: list, items: string }
status: { type: string, enum: [draft, published, archived], default: draft }
content: true
additional_properties: false
strict: true
views:
post_feed:
query: |
SELECT p.title, p.date, u.name AS author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.status = 'published'
ORDER BY p.date DESC
LIMIT 100
materialize: true
buffer: 2x
user_lookup:
query: |
SELECT id, name, email, role
FROM users
ORDER BY name ASC
materialize: false
all_posts:
query: |
SELECT id, title, status, date
FROM posts
ORDER BY date DESC
materialize: false
"#;
std::fs::write(tmp.path().join("schema.yaml"), schema).unwrap();
std::fs::create_dir_all(tmp.path().join("users")).unwrap();
std::fs::create_dir_all(tmp.path().join("posts")).unwrap();
let store = Store::open(tmp.path().to_str().unwrap()).unwrap();
(tmp, store)
}
fn seed_view_data(store: &Store) {
let users = store.collection("users").unwrap();
users.insert(
serde_yaml::from_str("name: Alice\nemail: alice@test.com\nrole: admin").unwrap(),
None,
).unwrap();
users.insert(
serde_yaml::from_str("name: Bob\nemail: bob@test.com\nrole: member").unwrap(),
None,
).unwrap();
let posts = store.collection("posts").unwrap();
posts.insert(
serde_yaml::from_str("title: First Post\nauthor_id: alice\ndate: '2026-01-10'\nstatus: published").unwrap(),
Some("First post content"),
).unwrap();
posts.insert(
serde_yaml::from_str("title: Second Post\nauthor_id: bob\ndate: '2026-01-15'\nstatus: published").unwrap(),
Some("Second post content"),
).unwrap();
posts.insert(
serde_yaml::from_str("title: Draft Post\nauthor_id: alice\ndate: '2026-01-20'\nstatus: draft").unwrap(),
Some("Draft content"),
).unwrap();
}
#[test]
fn test_view_execution_user_lookup() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let result = store.view_dynamic("user_lookup").unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["name"], "Alice");
assert_eq!(rows[1]["name"], "Bob");
assert!(rows[0]["email"].is_string());
assert!(rows[0]["role"].is_string());
}
#[test]
fn test_view_execution_post_feed_join() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let result = store.view_dynamic("post_feed").unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0]["title"], "Second Post");
assert_eq!(rows[0]["author_name"], "Bob");
assert_eq!(rows[1]["title"], "First Post");
assert_eq!(rows[1]["author_name"], "Alice");
}
#[test]
fn test_view_execution_where_filter() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let result = store.view_dynamic("post_feed").unwrap();
let rows = result.as_array().unwrap();
for row in rows {
assert!(row["author_name"].is_string());
}
let titles: Vec<&str> = rows.iter().filter_map(|r| r["title"].as_str()).collect();
assert!(!titles.contains(&"Draft Post"));
}
#[test]
fn test_view_execution_order_by() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let result = store.view_dynamic("all_posts").unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 3);
assert_eq!(rows[0]["title"], "Draft Post");
assert_eq!(rows[1]["title"], "Second Post");
assert_eq!(rows[2]["title"], "First Post");
}
#[test]
fn test_view_execution_limit() {
let tmp = TempDir::new().unwrap();
let schema = r#"
collections:
users:
path: "users/{name}.md"
fields:
name: { type: string, required: true }
email: { type: string, required: true }
additional_properties: false
strict: true
views:
recent_users:
query: |
SELECT id, name
FROM users
ORDER BY name ASC
LIMIT 2
materialize: false
"#;
std::fs::write(tmp.path().join("schema.yaml"), schema).unwrap();
std::fs::create_dir_all(tmp.path().join("users")).unwrap();
let store = Store::open(tmp.path().to_str().unwrap()).unwrap();
let users = store.collection("users").unwrap();
users.insert(
serde_yaml::from_str("name: Alice\nemail: a@test.com").unwrap(),
None,
).unwrap();
users.insert(
serde_yaml::from_str("name: Bob\nemail: b@test.com").unwrap(),
None,
).unwrap();
users.insert(
serde_yaml::from_str("name: Charlie\nemail: c@test.com").unwrap(),
None,
).unwrap();
let result = store.view_dynamic("recent_users").unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 2);
}
#[test]
fn test_view_materialization() {
let (tmp, store) = setup_store_with_views();
seed_view_data(&store);
let views_dir = tmp.path().join("views");
let materialized = views_dir.join("post_feed.yaml");
assert!(materialized.exists(), "Materialized view file should exist");
let content = std::fs::read_to_string(&materialized).unwrap();
assert!(content.contains("Second Post"));
assert!(content.contains("First Post"));
assert!(!content.contains("Draft Post"));
}
#[test]
fn test_view_buffer_multiplier() {
let tmp = TempDir::new().unwrap();
let schema = r#"
collections:
users:
path: "users/{name}.md"
fields:
name: { type: string, required: true }
email: { type: string, required: true }
additional_properties: false
strict: true
views:
buffered_users:
query: |
SELECT id, name
FROM users
ORDER BY name ASC
LIMIT 2
materialize: true
buffer: 2x
"#;
std::fs::write(tmp.path().join("schema.yaml"), schema).unwrap();
std::fs::create_dir_all(tmp.path().join("users")).unwrap();
let store = Store::open(tmp.path().to_str().unwrap()).unwrap();
for name in &["Alice", "Bob", "Charlie", "Diana", "Eve"] {
let data: serde_yaml::Value = serde_yaml::from_str(
&format!("name: {name}\nemail: {}@test.com", name.to_lowercase()),
).unwrap();
store.collection("users").unwrap().insert(data, None).unwrap();
}
let result = store.view_dynamic("buffered_users").unwrap();
let rows = result.as_array().unwrap();
assert!(rows.len() <= 4, "Buffer should limit to 4 rows, got {}", rows.len());
let materialized = tmp.path().join("views/buffered_users.yaml");
assert!(materialized.exists());
let content = std::fs::read_to_string(&materialized).unwrap();
let yaml_rows: Vec<serde_yaml::Value> = serde_yaml::from_str(&content).unwrap();
assert_eq!(yaml_rows.len(), 2, "Materialized output should have exactly 2 rows");
}
#[test]
fn test_subscription_on_insert() {
let (_tmp, store) = setup_test_store();
let received = Arc::new(Mutex::new(Vec::<ChangeEvent>::new()));
let received_clone = received.clone();
store.on_collection_change(
"users",
Box::new(move |event| {
received_clone.lock().unwrap().push(event);
}),
);
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
let events = received.lock().unwrap();
assert_eq!(events.len(), 1);
match &events[0] {
ChangeEvent::Inserted { id, .. } => assert_eq!(id, "alice"),
other => panic!("Expected Inserted event, got {:?}", other),
}
}
#[test]
fn test_subscription_on_update() {
let (_tmp, store) = setup_test_store();
let received = Arc::new(Mutex::new(Vec::<ChangeEvent>::new()));
let received_clone = received.clone();
store.on_collection_change(
"users",
Box::new(move |event| {
received_clone.lock().unwrap().push(event);
}),
);
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
let updated: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@new.com").unwrap();
users.update("alice", updated, None).unwrap();
let events = received.lock().unwrap();
assert_eq!(events.len(), 2);
match &events[1] {
ChangeEvent::Updated { id, .. } => assert_eq!(id, "alice"),
other => panic!("Expected Updated event, got {:?}", other),
}
}
#[test]
fn test_subscription_on_delete() {
let (_tmp, store) = setup_test_store();
let received = Arc::new(Mutex::new(Vec::<ChangeEvent>::new()));
let received_clone = received.clone();
store.on_collection_change(
"users",
Box::new(move |event| {
received_clone.lock().unwrap().push(event);
}),
);
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
users.delete("alice").unwrap();
let events = received.lock().unwrap();
assert_eq!(events.len(), 2);
match &events[1] {
ChangeEvent::Deleted { id } => assert_eq!(id, "alice"),
other => panic!("Expected Deleted event, got {:?}", other),
}
}
#[test]
fn test_subscription_unsubscribe() {
let (_tmp, store) = setup_test_store();
let received = Arc::new(Mutex::new(Vec::<ChangeEvent>::new()));
let received_clone = received.clone();
let sub_id = store.on_collection_change(
"users",
Box::new(move |event| {
received_clone.lock().unwrap().push(event);
}),
);
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
store.unsubscribe(sub_id);
let data2: serde_yaml::Value =
serde_yaml::from_str("name: Bob\nemail: bob@test.com").unwrap();
users.insert(data2, None).unwrap();
let events = received.lock().unwrap();
assert_eq!(events.len(), 1, "Should only have 1 event after unsubscribe");
}
#[test]
fn test_view_subscription() {
let (_tmp, store) = setup_store_with_views();
let received = Arc::new(Mutex::new(Vec::<Vec<serde_json::Value>>::new()));
let received_clone = received.clone();
store.on_view_change(
"user_lookup",
Box::new(move |data| {
received_clone.lock().unwrap().push(data.to_vec());
}),
);
let users = store.collection("users").unwrap();
let data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(data, None).unwrap();
let events = received.lock().unwrap();
assert!(!events.is_empty(), "View subscriber should have been notified");
let latest = events.last().unwrap();
assert!(latest.iter().any(|row| row["name"] == "Alice"));
}
#[test]
fn test_list_dynamic_with_filters() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let mut filters = HashMap::new();
filters.insert("role".to_string(), "admin".to_string());
let result = store.list_dynamic("users", &filters).unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], "Alice");
filters.insert("role".to_string(), "member".to_string());
let result = store.list_dynamic("users", &filters).unwrap();
let rows = result.as_array().unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], "Bob");
}
#[test]
fn test_rebuild_also_rebuilds_views() {
let (_tmp, store) = setup_store_with_views();
seed_view_data(&store);
let result = store.view_dynamic("user_lookup").unwrap();
assert_eq!(result.as_array().unwrap().len(), 2);
store.rebuild(None).unwrap();
let result = store.view_dynamic("user_lookup").unwrap();
assert_eq!(result.as_array().unwrap().len(), 2);
}
#[test]
fn test_explain_view() {
let (_tmp, store) = setup_store_with_views();
let result = store.explain_view("post_feed").unwrap();
assert_eq!(result["view"], "post_feed");
assert!(result["original_sql"].as_str().unwrap().contains("SELECT"));
assert!(result["rewritten_sql"].as_str().unwrap().contains("WITH"));
assert_eq!(result["limit"], 100);
assert_eq!(result["buffer_limit"], 200);
assert_eq!(result["is_query_template"], false);
}
#[test]
fn test_strip_limit_basic() {
assert_eq!(strip_limit("SELECT * FROM t LIMIT 10"), "SELECT * FROM t");
assert_eq!(strip_limit("SELECT * FROM t"), "SELECT * FROM t");
assert_eq!(strip_limit("SELECT * FROM t LIMIT 100 "), "SELECT * FROM t");
}
#[test]
fn test_strip_limit_newline_prefix() {
assert_eq!(strip_limit("SELECT * FROM t\nLIMIT 10"), "SELECT * FROM t");
assert_eq!(strip_limit("SELECT * FROM t\n LIMIT 100"), "SELECT * FROM t");
}
#[test]
fn test_strip_limit_preserves_inner_limit() {
let sql = "WITH t AS (SELECT * FROM x LIMIT 5) SELECT * FROM t LIMIT 10";
let result = strip_limit(sql);
assert_eq!(result, "WITH t AS (SELECT * FROM x LIMIT 5) SELECT * FROM t");
}
#[test]
fn test_file_move_reconciles_yaml_status() {
let (tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Alice\nemail: alice@test.com").unwrap();
users.insert(user_data, None).unwrap();
let posts = store.collection("posts").unwrap();
let post_data: serde_yaml::Value = serde_yaml::from_str(
"title: My Post\nauthor_id: alice\ndate: '2026-02-13'\nstatus: draft",
)
.unwrap();
posts.insert(post_data, Some("Hello world")).unwrap();
let draft_path = tmp.path().join("posts/draft/2026-02-13-my-post.md");
assert!(draft_path.exists(), "Draft file should exist");
let published_dir = tmp.path().join("posts/published");
std::fs::create_dir_all(&published_dir).unwrap();
let published_path = published_dir.join("2026-02-13-my-post.md");
std::fs::rename(&draft_path, &published_path).unwrap();
let before = document::read_document(&published_path).unwrap();
assert_eq!(
before.data["status"],
serde_yaml::Value::String("draft".into()),
"Status should still be 'draft' before reconciliation"
);
let event = WatcherEvent {
path: published_path.clone(),
kind: ChangeKind::Created,
};
store
.process_single_watcher_event("posts", &event)
.unwrap();
let after = document::read_document(&published_path).unwrap();
assert_eq!(
after.data["status"],
serde_yaml::Value::String("published".into()),
"Status should be reconciled to 'published' after file move"
);
assert!(
after.content.as_deref().unwrap().contains("Hello world"),
"Body content should be preserved"
);
}
#[test]
fn test_file_move_no_change_when_already_matching() {
let (tmp, store) = setup_test_store();
let users = store.collection("users").unwrap();
let user_data: serde_yaml::Value =
serde_yaml::from_str("name: Bob\nemail: bob@test.com").unwrap();
users.insert(user_data, None).unwrap();
let user_path = tmp.path().join("users/bob.md");
assert!(user_path.exists());
let original_content = std::fs::read_to_string(&user_path).unwrap();
let event = WatcherEvent {
path: user_path.clone(),
kind: ChangeKind::Modified,
};
store
.process_single_watcher_event("users", &event)
.unwrap();
let after_content = std::fs::read_to_string(&user_path).unwrap();
assert_eq!(original_content, after_content, "File should not be rewritten when path already matches YAML");
}
}