use std::path::PathBuf;
use std::process::ExitCode;
use djogi::__bypass::RawAccessExt as _;
use djogi::config::DjogiConfig;
use djogi::migrate::{
FilesystemBucket, SNAPSHOT_FILENAME, app_dirname, migrations_root, resolve_audit_url,
scan_filesystem, signature_to_hex,
};
use djogi::pg::pool::DjogiPool;
use djogi::snapshot::sign::{SnapshotKeyError, load_signing_key_from_env, sign_snapshot};
#[derive(Debug)]
pub enum VerifyError {
Io {
path: PathBuf,
source: std::io::Error,
},
KeyDecode(SnapshotKeyError),
AuditPoolUnreachable {
url: String,
message: String,
},
Config(String),
SymlinkSnapshot {
path: PathBuf,
},
}
impl std::fmt::Display for VerifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
VerifyError::Io { path, source } => {
write!(f, "I/O error at {}: {source}", path.display())
}
VerifyError::KeyDecode(err) => {
write!(f, "DJOGI_SNAPSHOT_SIGNING_KEY: {err}")
}
VerifyError::AuditPoolUnreachable { url, message } => write!(
f,
"audit DB at `{url}` unreachable: {message} \
(set DJOGI_CRUD_LOG_URL or check Djogi.toml::database.url)",
),
VerifyError::Config(message) => write!(f, "config load: {message}"),
VerifyError::SymlinkSnapshot { path } => write!(
f,
"snapshot path is a symlink; refusing to follow to prevent path-traversal escapes: {} \
(replace the symlink with the real `schema_snapshot.json` file or remove the \
offending entry from the migrations tree)",
path.display()
),
}
}
}
impl std::error::Error for VerifyError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
VerifyError::Io { source, .. } => Some(source),
VerifyError::KeyDecode(err) => Some(err),
VerifyError::AuditPoolUnreachable { .. }
| VerifyError::Config(_)
| VerifyError::SymlinkSnapshot { .. } => None,
}
}
}
pub async fn run(workspace: Option<PathBuf>) -> Result<ExitCode, VerifyError> {
let workspace =
workspace.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
let config = DjogiConfig::load_from_workspace(&workspace)
.map_err(|e| VerifyError::Config(e.to_string()))?;
let key = match load_signing_key_from_env() {
Ok(Some(k)) => k,
Ok(None) => [0u8; 32],
Err(e) => return Err(VerifyError::KeyDecode(e)),
};
let mut buckets: Vec<FilesystemBucket> = scan_filesystem(&workspace)
.map_err(|e| VerifyError::Io {
path: migrations_root(&workspace),
source: e,
})?
.into_iter()
.collect();
buckets.sort();
let audit_url = resolve_audit_url(&config).map_err(|e| VerifyError::Config(e.to_string()))?;
let pool = match DjogiPool::connect(&audit_url).await {
Ok(p) => (audit_url.clone(), p),
Err(e) => {
return Err(VerifyError::AuditPoolUnreachable {
url: audit_url,
message: e.to_string(),
});
}
};
if let Err(e) = djogi::pg::preflight::check_postgres_version(&pool.1).await {
return Err(VerifyError::Config(format!("support boundary: {e}")));
}
let mut any_mismatch = false;
let (audit_url_for_log, audit_pool) = pool;
let mut audit_ctx = djogi::context::DjogiContext::from_pool(audit_pool);
for bucket in &buckets {
let snapshot = workspace
.join("migrations")
.join(&bucket.database)
.join(app_dirname(&bucket.app))
.join(SNAPSHOT_FILENAME);
let bytes = match read_snapshot_bytes(&snapshot)? {
Some(b) => b,
None => continue,
};
let computed = sign_snapshot(&bytes, &key);
let computed_hex = signature_to_hex(&computed);
let stored = match fetch_audit_signature(
&mut audit_ctx,
&bucket.database,
&bucket.app,
&audit_url_for_log,
)
.await
{
Ok(opt) => opt,
Err(FetchAuditError::TableAbsent) => {
eprintln!(
"warn: djogi_ddl_audit absent on `{audit_url_for_log}` — \
skipping cross-check for {}/{} (snapshot at {})",
bucket.database,
if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
},
snapshot.display()
);
continue;
}
Err(FetchAuditError::Other(message)) => {
return Err(VerifyError::AuditPoolUnreachable {
url: audit_url_for_log.clone(),
message,
});
}
};
match stored {
Some(stored_hex) if eq_ignore_ascii_case_hex(&stored_hex, &computed_hex) => {
println!("OK {}", snapshot.display());
}
Some(stored_hex) => {
eprintln!(
"MISMATCH {}: expected {stored_hex}, got {computed_hex}",
snapshot.display()
);
any_mismatch = true;
}
None => {
eprintln!(
"warn: no djogi_ddl_audit row for {}/{} — skipping",
bucket.database,
if bucket.app.is_empty() {
"_global_"
} else {
&bucket.app
}
);
}
}
}
Ok(if any_mismatch {
ExitCode::from(1)
} else {
ExitCode::SUCCESS
})
}
fn read_snapshot_bytes(snapshot: &std::path::Path) -> Result<Option<Vec<u8>>, VerifyError> {
let meta = match std::fs::symlink_metadata(snapshot) {
Ok(m) => m,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => {
return Err(VerifyError::Io {
path: snapshot.to_path_buf(),
source: e,
});
}
};
if meta.file_type().is_symlink() {
return Err(VerifyError::SymlinkSnapshot {
path: snapshot.to_path_buf(),
});
}
if !meta.is_file() {
return Ok(None);
}
let bytes = std::fs::read(snapshot).map_err(|e| VerifyError::Io {
path: snapshot.to_path_buf(),
source: e,
})?;
Ok(Some(bytes))
}
enum FetchAuditError {
TableAbsent,
Other(String),
}
async fn fetch_audit_signature(
ctx: &mut djogi::context::DjogiContext,
target_database: &str,
app_label: &str,
_audit_url: &str,
) -> Result<Option<String>, FetchAuditError> {
let sql = "SELECT snapshot_signature_hex FROM djogi_ddl_audit \
WHERE target_database = $1 AND app_label = $2 \
AND snapshot_signature_hex IS NOT NULL \
ORDER BY id DESC LIMIT 1";
match ctx.raw_rows(sql, &[&target_database, &app_label]).await {
Ok(rows) => {
if let Some(row) = rows.first() {
let hex: Option<String> = row.try_get(0).map_err(|e| {
FetchAuditError::Other(format!("decoding snapshot_signature_hex: {e}"))
})?;
Ok(hex)
} else {
Ok(None)
}
}
Err(djogi::DjogiError::Db(db)) => {
if let Some(code) = db.code()
&& code == &tokio_postgres::error::SqlState::UNDEFINED_TABLE
{
Err(FetchAuditError::TableAbsent)
} else {
Err(FetchAuditError::Other(db.to_string()))
}
}
Err(other) => Err(FetchAuditError::Other(other.to_string())),
}
}
fn eq_ignore_ascii_case_hex(a: &str, b: &str) -> bool {
if a.len() != b.len() {
return false;
}
a.bytes()
.zip(b.bytes())
.all(|(x, y)| x.eq_ignore_ascii_case(&y))
}
#[cfg(test)]
const TEST_SNAPSHOT_FILENAME: &str = SNAPSHOT_FILENAME;
#[cfg(test)]
mod tests {
use super::*;
use djogi::migrate::AuditUrlError;
#[test]
fn eq_ignore_ascii_case_hex_uppercase_lowercase() {
assert!(eq_ignore_ascii_case_hex("DEADBEEF", "deadbeef",));
assert!(eq_ignore_ascii_case_hex(&"0".repeat(64), &"0".repeat(64),));
assert!(!eq_ignore_ascii_case_hex("DEADBEEF", "DEADBEEE",));
assert!(!eq_ignore_ascii_case_hex("DEAD", "DEADBEEF"));
}
#[test]
fn audit_url_self_audit_maps_to_verify_config_with_actionable_message() {
let mapped = VerifyError::Config(
AuditUrlError::SelfAudit {
application_url: "postgres://localhost/crud_log".to_string(),
}
.to_string(),
);
let display = format!("{mapped}");
assert!(
display.contains("audit URL derivation produced the same URL"),
"mapped Display must surface the resolver's actionable language; got: {display}",
);
assert!(
display.contains("postgres://localhost/crud_log"),
"mapped Display must echo the offending URL; got: {display}",
);
assert!(
display.contains("CRUD_LOG_URL"),
"mapped Display must point at the env-var override; got: {display}",
);
}
#[cfg(unix)]
#[test]
fn verify_rejects_symlink_snapshot() {
use std::fs;
use std::os::unix::fs::symlink;
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let workspace = std::env::temp_dir().join(format!("djogi-cli-verify-symlink-{nanos}-{n}"));
fs::create_dir_all(&workspace).unwrap();
let outside_target =
std::env::temp_dir().join(format!("djogi-cli-verify-outside-{nanos}-{n}.txt"));
fs::write(&outside_target, b"attacker-controlled bytes").unwrap();
let app_dir = workspace.join("migrations/main/_global_");
fs::create_dir_all(&app_dir).unwrap();
let snapshot_link = app_dir.join("schema_snapshot.json");
symlink(&outside_target, &snapshot_link).unwrap();
let result = read_snapshot_bytes(&snapshot_link);
let _ = fs::remove_file(&outside_target);
let _ = fs::remove_dir_all(&workspace);
match result {
Err(VerifyError::SymlinkSnapshot { path }) => {
assert!(
path.ends_with("schema_snapshot.json"),
"SymlinkSnapshot path must point at the in-workspace symlink, got: {}",
path.display()
);
let display = format!("{}", VerifyError::SymlinkSnapshot { path });
assert!(
display.contains("snapshot path is a symlink"),
"Display must be operator-actionable, got: {display}"
);
assert!(
display.contains("refusing to follow"),
"Display must explain the refusal, got: {display}"
);
}
other => panic!(
"expected VerifyError::SymlinkSnapshot rejecting symlinked snapshot, got: {other:?}"
),
}
}
#[test]
fn read_snapshot_bytes_returns_none_for_missing_file() {
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let missing =
std::env::temp_dir().join(format!("djogi-cli-verify-missing-{nanos}-{n}.json"));
assert!(!missing.exists());
let result = read_snapshot_bytes(&missing);
assert!(
matches!(result, Ok(None)),
"missing file must return Ok(None), got: {result:?}"
);
}
#[test]
fn read_snapshot_bytes_returns_bytes_for_regular_file() {
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let n = COUNTER.fetch_add(1, Ordering::SeqCst);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let path = std::env::temp_dir().join(format!("djogi-cli-verify-regular-{nanos}-{n}.json"));
fs::write(&path, b"{\"x\":1}").unwrap();
let result = read_snapshot_bytes(&path);
let _ = fs::remove_file(&path);
match result {
Ok(Some(bytes)) => assert_eq!(bytes, b"{\"x\":1}"),
other => panic!("expected Ok(Some(bytes)), got: {other:?}"),
}
}
#[test]
fn snapshot_filename_constant_matches_upstream() {
assert_eq!(TEST_SNAPSHOT_FILENAME, "schema_snapshot.json");
}
}