use async_trait::async_trait;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::{AuditStore, BaselineStore, StorageHealth};
use crate::error::StoreError;
use crate::models::{
AuditEvent, BaselineRecord, BaselineSummary, BaselineVersion, ListAuditEventsQuery,
ListAuditEventsResponse, ListBaselinesQuery, ListBaselinesResponse, ListVerdictsQuery,
ListVerdictsResponse, PaginationInfo, VerdictRecord,
};
#[derive(Debug, Default)]
pub struct InMemoryStore {
#[allow(clippy::type_complexity)]
baselines: Arc<RwLock<BTreeMap<(String, String, String), BaselineRecord>>>,
verdicts: Arc<RwLock<Vec<VerdictRecord>>>,
audit_events: Arc<RwLock<Vec<AuditEvent>>>,
}
impl InMemoryStore {
pub fn new() -> Self {
Self {
baselines: Arc::new(RwLock::new(BTreeMap::new())),
verdicts: Arc::new(RwLock::new(Vec::new())),
audit_events: Arc::new(RwLock::new(Vec::new())),
}
}
fn key(project: &str, benchmark: &str, version: &str) -> (String, String, String) {
(
project.to_string(),
benchmark.to_string(),
version.to_string(),
)
}
}
#[async_trait]
impl BaselineStore for InMemoryStore {
async fn create(&self, record: &BaselineRecord) -> Result<(), StoreError> {
let key = Self::key(&record.project, &record.benchmark, &record.version);
let mut baselines = self.baselines.write().await;
if baselines.contains_key(&key) {
return Err(StoreError::AlreadyExists(format!(
"project={}, benchmark={}, version={}",
record.project, record.benchmark, record.version
)));
}
baselines.insert(key, record.clone());
Ok(())
}
async fn get(
&self,
project: &str,
benchmark: &str,
version: &str,
) -> Result<Option<BaselineRecord>, StoreError> {
let key = Self::key(project, benchmark, version);
let baselines = self.baselines.read().await;
Ok(baselines.get(&key).filter(|r| !r.deleted).cloned())
}
async fn get_latest(
&self,
project: &str,
benchmark: &str,
) -> Result<Option<BaselineRecord>, StoreError> {
let baselines = self.baselines.read().await;
let latest = baselines
.values()
.filter(|r| r.project == project && r.benchmark == benchmark && !r.deleted)
.max_by_key(|r| r.created_at);
Ok(latest.cloned())
}
#[allow(clippy::collapsible_if)]
async fn list(
&self,
project: &str,
query: &ListBaselinesQuery,
) -> Result<ListBaselinesResponse, StoreError> {
let baselines = self.baselines.read().await;
let parsed_tags = query.parsed_tags();
let mut filtered: Vec<_> = baselines
.values()
.filter(|r| {
if r.project != project || r.deleted {
return false;
}
if let Some(ref b) = query.benchmark {
if &r.benchmark != b {
return false;
}
}
if let Some(ref p) = query.benchmark_prefix {
if !r.benchmark.starts_with(p) {
return false;
}
}
if let Some(ref gr) = query.git_ref {
if r.git_ref.as_deref() != Some(gr) {
return false;
}
}
if let Some(ref gs) = query.git_sha {
if r.git_sha.as_deref() != Some(gs) {
return false;
}
}
if let Some(since) = query.since {
if r.created_at < since {
return false;
}
}
if let Some(until) = query.until {
if r.created_at > until {
return false;
}
}
if !parsed_tags.is_empty() {
for tag in &parsed_tags {
if !r.tags.contains(tag) {
return false;
}
}
}
true
})
.collect();
filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
let total = filtered.len() as u64;
let offset = query.offset as usize;
let limit = query.limit as usize;
let paginated: Vec<_> = filtered
.into_iter()
.skip(offset)
.take(limit)
.map(|r| {
let mut summary: BaselineSummary = r.clone().into();
if query.include_receipt {
summary.receipt = Some(r.receipt.clone());
}
summary
})
.collect();
let has_more = (offset + paginated.len()) < total as usize;
Ok(ListBaselinesResponse {
baselines: paginated,
pagination: PaginationInfo {
total,
limit: query.limit,
offset: query.offset,
has_more,
},
})
}
async fn update(&self, record: &BaselineRecord) -> Result<(), StoreError> {
let key = Self::key(&record.project, &record.benchmark, &record.version);
let mut baselines = self.baselines.write().await;
if !baselines.contains_key(&key) {
return Err(StoreError::NotFound(format!(
"project={}, benchmark={}, version={}",
record.project, record.benchmark, record.version
)));
}
baselines.insert(key, record.clone());
Ok(())
}
async fn delete(
&self,
project: &str,
benchmark: &str,
version: &str,
) -> Result<bool, StoreError> {
let key = Self::key(project, benchmark, version);
let mut baselines = self.baselines.write().await;
if let Some(record) = baselines.get_mut(&key) {
if record.deleted {
return Ok(false);
}
record.deleted = true;
return Ok(true);
}
Ok(false)
}
async fn hard_delete(
&self,
project: &str,
benchmark: &str,
version: &str,
) -> Result<bool, StoreError> {
let key = Self::key(project, benchmark, version);
let mut baselines = self.baselines.write().await;
Ok(baselines.remove(&key).is_some())
}
async fn list_versions(
&self,
project: &str,
benchmark: &str,
) -> Result<Vec<BaselineVersion>, StoreError> {
let baselines = self.baselines.read().await;
let mut versions: Vec<_> = baselines
.values()
.filter(|r| r.project == project && r.benchmark == benchmark && !r.deleted)
.map(|r| BaselineVersion {
version: r.version.clone(),
git_ref: r.git_ref.clone(),
git_sha: r.git_sha.clone(),
created_at: r.created_at,
created_by: None,
is_current: false,
source: r.source.clone(),
})
.collect();
versions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
if let Some(first) = versions.first_mut() {
first.is_current = true;
}
Ok(versions)
}
async fn health_check(&self) -> Result<StorageHealth, StoreError> {
Ok(StorageHealth::Healthy)
}
fn backend_type(&self) -> &'static str {
"memory"
}
async fn create_verdict(&self, record: &VerdictRecord) -> Result<(), StoreError> {
let mut verdicts = self.verdicts.write().await;
verdicts.push(record.clone());
Ok(())
}
async fn list_verdicts(
&self,
project: &str,
query: &ListVerdictsQuery,
) -> Result<ListVerdictsResponse, StoreError> {
let verdicts = self.verdicts.read().await;
let mut filtered: Vec<_> = verdicts
.iter()
.filter(|r| {
if r.project != project {
return false;
}
if let Some(ref b) = query.benchmark
&& &r.benchmark != b
{
return false;
}
if let Some(ref s) = query.status
&& &r.status != s
{
return false;
}
if let Some(since) = query.since
&& r.created_at < since
{
return false;
}
if let Some(until) = query.until
&& r.created_at > until
{
return false;
}
true
})
.cloned()
.collect();
filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
let total = filtered.len() as u64;
let offset = query.offset as usize;
let limit = query.limit as usize;
let paginated: Vec<_> = filtered.into_iter().skip(offset).take(limit).collect();
let has_more = (offset + paginated.len()) < total as usize;
Ok(ListVerdictsResponse {
verdicts: paginated,
pagination: PaginationInfo {
total,
limit: query.limit,
offset: query.offset,
has_more,
},
})
}
}
#[async_trait]
impl AuditStore for InMemoryStore {
async fn log_event(&self, event: &AuditEvent) -> Result<(), StoreError> {
let mut events = self.audit_events.write().await;
events.push(event.clone());
Ok(())
}
async fn list_events(
&self,
query: &ListAuditEventsQuery,
) -> Result<ListAuditEventsResponse, StoreError> {
let events = self.audit_events.read().await;
let mut filtered: Vec<_> = events
.iter()
.filter(|e| {
if let Some(ref project) = query.project
&& &e.project != project
{
return false;
}
if let Some(ref action) = query.action
&& e.action.to_string() != *action
{
return false;
}
if let Some(ref resource_type) = query.resource_type
&& e.resource_type.to_string() != *resource_type
{
return false;
}
if let Some(ref actor) = query.actor
&& &e.actor != actor
{
return false;
}
if let Some(since) = query.since
&& e.timestamp < since
{
return false;
}
if let Some(until) = query.until
&& e.timestamp > until
{
return false;
}
true
})
.cloned()
.collect();
filtered.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
let total = filtered.len() as u64;
let offset = query.offset as usize;
let limit = query.limit as usize;
let paginated: Vec<_> = filtered.into_iter().skip(offset).take(limit).collect();
let has_more = (offset + paginated.len()) < total as usize;
Ok(ListAuditEventsResponse {
events: paginated,
pagination: PaginationInfo {
total,
limit: query.limit,
offset: query.offset,
has_more,
},
})
}
}