use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::Mutex;
use crate::{AuditError, AuditLog, AuditResult};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RekorReceipt {
pub submission_id: String,
pub chain_head: String,
pub position: u64,
#[serde(with = "time::serde::rfc3339")]
pub submitted_at: OffsetDateTime,
}
#[async_trait]
pub trait RekorSubmitter: Send + Sync {
async fn submit(&self, chain_head: &str, position: u64) -> AuditResult<RekorReceipt>;
}
#[derive(Default)]
pub struct StubRekorSubmitter {
received: Arc<Mutex<Vec<RekorReceipt>>>,
}
impl StubRekorSubmitter {
pub fn new() -> Self {
Self::default()
}
pub async fn history(&self) -> Vec<RekorReceipt> {
self.received.lock().await.clone()
}
}
#[async_trait]
impl RekorSubmitter for StubRekorSubmitter {
async fn submit(&self, chain_head: &str, position: u64) -> AuditResult<RekorReceipt> {
let receipt = RekorReceipt {
submission_id: format!("stub_{}", uuid::Uuid::new_v4().simple()),
chain_head: chain_head.to_string(),
position,
submitted_at: OffsetDateTime::now_utc(),
};
self.received.lock().await.push(receipt.clone());
Ok(receipt)
}
}
pub struct LoggingRekorSubmitter;
#[async_trait]
impl RekorSubmitter for LoggingRekorSubmitter {
async fn submit(&self, chain_head: &str, position: u64) -> AuditResult<RekorReceipt> {
tracing::info!(
target: "aex_audit::rekor",
chain_head = chain_head,
position = position,
"chain head submitted (logging submitter)"
);
Ok(RekorReceipt {
submission_id: format!("log_{}", uuid::Uuid::new_v4().simple()),
chain_head: chain_head.to_string(),
position,
submitted_at: OffsetDateTime::now_utc(),
})
}
}
pub struct RekorAnchoredAuditLog<Inner: AuditLog + Send + Sync + 'static> {
inner: Arc<Inner>,
submitter: Arc<dyn RekorSubmitter>,
interval: Duration,
}
impl<Inner: AuditLog + Send + Sync + 'static> RekorAnchoredAuditLog<Inner> {
pub fn new(inner: Inner, submitter: Arc<dyn RekorSubmitter>, interval: Duration) -> Self {
Self {
inner: Arc::new(inner),
submitter,
interval,
}
}
pub fn spawn_submission_loop(&self) -> tokio::task::JoinHandle<()> {
let inner = self.inner.clone();
let submitter = self.submitter.clone();
let interval = self.interval;
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
ticker.tick().await;
let head = match inner.current_head().await {
Ok(h) => h,
Err(e) => {
tracing::warn!(
target: "aex_audit::rekor",
error = %e,
"current_head failed; skipping Rekor submission tick"
);
continue;
}
};
let len = inner.len().await.unwrap_or(0);
if let Err(e) = submitter.submit(&head, len).await {
tracing::warn!(
target: "aex_audit::rekor",
error = %e,
"Rekor submission failed (will retry on next tick)"
);
}
}
})
}
pub async fn submit_now(&self) -> AuditResult<RekorReceipt> {
let head = self.inner.current_head().await?;
let len = self.inner.len().await?;
self.submitter.submit(&head, len).await
}
pub fn inner(&self) -> &Inner {
self.inner.as_ref()
}
}
#[async_trait]
impl<Inner: AuditLog + Send + Sync + 'static> AuditLog for RekorAnchoredAuditLog<Inner> {
async fn append(&self, event: crate::Event) -> AuditResult<crate::EventReceipt> {
self.inner.append(event).await
}
async fn current_head(&self) -> AuditResult<String> {
self.inner.current_head().await
}
async fn verify_chain(&self) -> AuditResult<()> {
self.inner.verify_chain().await
}
async fn len(&self) -> AuditResult<u64> {
self.inner.len().await
}
}
#[allow(dead_code)]
fn _sanity_check(e: AuditError) -> AuditError {
e
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Event, EventKind, MemoryAuditLog};
#[tokio::test]
async fn submit_now_captures_current_head() {
let log = MemoryAuditLog::new();
log.append(Event::new(
EventKind::AgentRegistered,
"actor",
"subject",
serde_json::json!({}),
))
.await
.unwrap();
let stub = Arc::new(StubRekorSubmitter::new());
let anchored = RekorAnchoredAuditLog::new(log, stub.clone(), Duration::from_secs(60));
let receipt = anchored.submit_now().await.unwrap();
assert_eq!(receipt.position, 1);
assert_eq!(receipt.chain_head.len(), 64);
let history = stub.history().await;
assert_eq!(history.len(), 1);
}
#[tokio::test]
async fn wrapping_passes_through_audit_log_trait() {
let stub = Arc::new(StubRekorSubmitter::new());
let anchored =
RekorAnchoredAuditLog::new(MemoryAuditLog::new(), stub, Duration::from_secs(60));
for i in 0..3 {
anchored
.append(Event::new(
EventKind::TransferInitiated,
"",
format!("tx_{}", i),
serde_json::json!({"i": i}),
))
.await
.unwrap();
}
assert_eq!(anchored.len().await.unwrap(), 3);
anchored.verify_chain().await.unwrap();
}
#[tokio::test]
async fn background_loop_emits_after_interval() {
let stub = Arc::new(StubRekorSubmitter::new());
let anchored = Arc::new(RekorAnchoredAuditLog::new(
MemoryAuditLog::new(),
stub.clone(),
Duration::from_millis(50),
));
let handle = anchored.spawn_submission_loop();
tokio::time::sleep(Duration::from_millis(180)).await;
handle.abort();
let history = stub.history().await;
assert!(
history.len() >= 2,
"expected >=2 submissions, got {}",
history.len()
);
}
}