pub mod handlers;
mod index;
pub use index::{ContentIndex, ContentIndexStore, IndexViolation, ViolationKind};
use crate::checkpoint::{CheckpointSignature, CheckpointSigner, CosignedCheckpoint};
use crate::error::Result;
use crate::witness::{
verify_consistency, AddCheckpointRequest, CheckpointVerifier, LogConfig, WitnessError,
WitnessStateStore, WitnessedState,
};
use async_trait::async_trait;
use ed25519_dalek::Signer;
use sea_orm::DatabaseConnection;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum ValidationResult {
Valid,
Invalid(ValidationError),
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum ValidationError {
#[error(
"duplicate SHA256: {hash} (first seen at index {first_index}, now at {current_index})"
)]
DuplicateSha256 {
hash: String,
first_index: u64,
current_index: u64,
},
#[error("filename '{filename}' already exists with different hash (first: {first_hash}, now: {current_hash})")]
DuplicateFilename {
filename: String,
first_hash: String,
current_hash: String,
first_index: u64,
current_index: u64,
},
#[error("failed to parse entry: {0}")]
ParseError(String),
#[error("{0}")]
Other(String),
}
#[async_trait]
pub trait Monitor: Send + Sync {
async fn load_state(&self, conn: &DatabaseConnection, origin: &str) -> Result<()>;
async fn validate_entry(&self, index: u64, data: &[u8]) -> Result<ValidationResult>;
async fn commit_entries(
&self,
conn: &DatabaseConnection,
origin: &str,
from_index: u64,
to_index: u64,
) -> Result<()>;
fn name(&self) -> &str;
}
pub struct MonitoringWitness<M: Monitor> {
monitor: Arc<M>,
signer: Arc<CheckpointSigner>,
conn: Arc<DatabaseConnection>,
state_store: WitnessStateStore,
logs: Vec<LogConfig>,
http_client: reqwest::Client,
}
impl<M: Monitor> MonitoringWitness<M> {
pub fn new(
monitor: Arc<M>,
signer: Arc<CheckpointSigner>,
conn: Arc<DatabaseConnection>,
logs: Vec<LogConfig>,
) -> Self {
Self {
monitor,
signer,
conn: conn.clone(),
state_store: WitnessStateStore::new(conn),
logs,
http_client: reqwest::Client::new(),
}
}
pub async fn load_state(&self) -> Result<()> {
for log in &self.logs {
tracing::info!("Loading monitor state for log: {}", log.origin);
self.monitor.load_state(&self.conn, &log.origin).await?;
}
Ok(())
}
pub fn name(&self) -> &str {
self.signer.name().as_str()
}
pub fn monitor_name(&self) -> &str {
self.monitor.name()
}
pub async fn add_checkpoint(
&self,
request: AddCheckpointRequest,
) -> std::result::Result<CheckpointSignature, MonitorError> {
let checkpoint = CosignedCheckpoint::from_text(&request.checkpoint).map_err(|e| {
MonitorError::Witness(WitnessError::BadRequest(format!(
"invalid checkpoint: {}",
e
)))
})?;
let origin = checkpoint.checkpoint.origin.as_str();
let log_config =
self.logs
.iter()
.find(|l| l.origin == origin)
.ok_or(MonitorError::Witness(WitnessError::UnknownLog(
origin.to_string(),
)))?;
let verifier = CheckpointVerifier::new(log_config.clone());
verifier
.verify(&checkpoint)
.map_err(|e| MonitorError::Witness(WitnessError::InvalidSignature(e.to_string())))?;
let new_size = checkpoint.checkpoint.size.value();
let new_root = checkpoint.checkpoint.root_hash;
if request.old_size > new_size {
return Err(MonitorError::Witness(WitnessError::BadRequest(format!(
"old_size ({}) > checkpoint size ({})",
request.old_size, new_size
))));
}
let state = self.state_store.get_or_init(origin).await.map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to get state: {}",
e
)))
})?;
if request.old_size != state.size {
return Err(MonitorError::Witness(WitnessError::Conflict(state.size)));
}
if state.size > 0 {
if new_size < state.size {
return Err(MonitorError::Witness(WitnessError::BadRequest(format!(
"checkpoint size ({}) < witnessed size ({})",
new_size, state.size
))));
}
if new_size == state.size {
if new_root != state.root_hash {
return Err(MonitorError::Witness(WitnessError::InvalidProof(
"same size but different roots - split view detected".to_string(),
)));
}
} else {
verify_consistency(
state.size,
new_size,
&state.root_hash,
&new_root,
&request.proof,
)
.map_err(|e| MonitorError::Witness(WitnessError::InvalidProof(e.to_string())))?;
}
} else if !request.proof.is_empty() {
return Err(MonitorError::Witness(WitnessError::InvalidProof(
"non-empty proof for empty tree".to_string(),
)));
}
if new_size > state.size {
let log_url = log_config.url.as_ref().ok_or_else(|| {
MonitorError::Witness(WitnessError::Internal("log URL not configured".to_string()))
})?;
self.validate_new_entries(log_url, state.size, new_size)
.await?;
}
let body = checkpoint.checkpoint.to_body();
let signature = self.signer.signing_key_ref().sign(body.as_bytes());
let cosig = CheckpointSignature {
name: self.signer.name().clone(),
key_id: self.signer.key_id().clone(),
signature,
};
if new_size > state.size {
self.monitor
.commit_entries(&self.conn, origin, state.size, new_size)
.await
.map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to commit entries: {}",
e
)))
})?;
}
self.state_store
.update(origin, new_size, new_root, &request.checkpoint)
.await
.map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to update state: {}",
e
)))
})?;
Ok(cosig)
}
async fn validate_new_entries(
&self,
log_url: &str,
from_index: u64,
to_index: u64,
) -> std::result::Result<(), MonitorError> {
tracing::info!(
"Validating entries {} to {} from {}",
from_index,
to_index,
log_url
);
const BUNDLE_SIZE: u64 = 256;
let mut current = from_index;
while current < to_index {
let bundle_index = current / BUNDLE_SIZE;
let bundle_start = bundle_index * BUNDLE_SIZE;
let entries = self
.fetch_entry_bundle(log_url, bundle_index, to_index)
.await?;
for (offset, entry_data) in entries.iter().enumerate() {
let entry_index = bundle_start + offset as u64;
if entry_index < from_index {
continue;
}
if entry_index >= to_index {
break;
}
match self.monitor.validate_entry(entry_index, entry_data).await {
Ok(ValidationResult::Valid) => {
tracing::debug!("Entry {} validated successfully", entry_index);
}
Ok(ValidationResult::Invalid(err)) => {
tracing::warn!("Entry {} validation failed: {}", entry_index, err);
return Err(MonitorError::Validation(err));
}
Err(e) => {
return Err(MonitorError::Witness(WitnessError::Internal(format!(
"validation error at index {}: {}",
entry_index, e
))));
}
}
}
current = (bundle_index + 1) * BUNDLE_SIZE;
}
Ok(())
}
async fn fetch_entry_bundle(
&self,
log_url: &str,
bundle_index: u64,
tree_size: u64,
) -> std::result::Result<Vec<Vec<u8>>, MonitorError> {
const BUNDLE_SIZE: u64 = 256;
let bundle_start = bundle_index * BUNDLE_SIZE;
let bundle_end = (bundle_index + 1) * BUNDLE_SIZE;
let path = if tree_size < bundle_end {
let partial = tree_size - bundle_start;
format_entry_path_with_partial(bundle_index, partial)
} else {
format_entry_path(bundle_index)
};
let url = format!("{}/tile/entries/{}", log_url.trim_end_matches('/'), path);
tracing::debug!("Fetching entry bundle from {}", url);
let response = self.http_client.get(&url).send().await.map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to fetch entries: {}",
e
)))
})?;
if !response.status().is_success() {
return Err(MonitorError::Witness(WitnessError::Internal(format!(
"failed to fetch entries: HTTP {}",
response.status()
))));
}
let data = response.bytes().await.map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to read entry bundle: {}",
e
)))
})?;
parse_entry_bundle(&data).map_err(|e| {
MonitorError::Witness(WitnessError::Internal(format!(
"failed to parse entry bundle: {}",
e
)))
})
}
pub async fn get_state(&self, origin: &str) -> Result<Option<WitnessedState>> {
self.state_store.get(origin).await
}
}
fn format_entry_path(index: u64) -> String {
if index == 0 {
return "000".to_string();
}
let mut parts = Vec::new();
let mut n = index;
while n > 0 {
parts.push(format!("{:03}", n % 1000));
n /= 1000;
}
parts.reverse();
let mut result = String::new();
for (i, part) in parts.iter().enumerate() {
if i > 0 {
result.push('/');
}
if i < parts.len() - 1 {
result.push('x');
}
result.push_str(part);
}
result
}
fn format_entry_path_with_partial(index: u64, partial: u64) -> String {
let base = format_entry_path(index);
format!("{}.p/{}", base, partial)
}
fn parse_entry_bundle(data: &[u8]) -> std::result::Result<Vec<Vec<u8>>, String> {
let mut entries = Vec::new();
let mut offset = 0;
while offset < data.len() {
if offset + 2 > data.len() {
return Err("truncated length prefix".to_string());
}
let len = u16::from_be_bytes([data[offset], data[offset + 1]]) as usize;
offset += 2;
if offset + len > data.len() {
return Err(format!("truncated entry: expected {} bytes", len));
}
entries.push(data[offset..offset + len].to_vec());
offset += len;
}
Ok(entries)
}
#[derive(Debug, thiserror::Error)]
pub enum MonitorError {
#[error("{0}")]
Witness(#[from] WitnessError),
#[error("validation failed: {0}")]
Validation(#[from] ValidationError),
}
impl MonitorError {
pub fn status_code(&self) -> u16 {
match self {
MonitorError::Witness(e) => e.status_code(),
MonitorError::Validation(_) => 422, }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_format_entry_path() {
assert_eq!(format_entry_path(0), "000");
assert_eq!(format_entry_path(1), "001");
assert_eq!(format_entry_path(123), "123");
assert_eq!(format_entry_path(1000), "x001/000");
assert_eq!(format_entry_path(1234), "x001/234");
assert_eq!(format_entry_path(123456), "x123/456");
}
#[test]
fn test_parse_entry_bundle() {
assert_eq!(parse_entry_bundle(&[]).unwrap(), Vec::<Vec<u8>>::new());
let bundle = vec![0x00, 0x03, 0x01, 0x02, 0x03];
let entries = parse_entry_bundle(&bundle).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0], vec![1, 2, 3]);
let bundle = vec![
0x00, 0x02, 0xAA, 0xBB, 0x00, 0x01, 0xCC, ];
let entries = parse_entry_bundle(&bundle).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0], vec![0xAA, 0xBB]);
assert_eq!(entries[1], vec![0xCC]);
}
}