use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use chrono::{DateTime, Utc};
use super::{EdgeRecord, EdgeStoreError, NewEdge, VALID_EDGE_TYPES};
struct EdgeData {
records: Vec<EdgeRecord>,
next_id: i64,
by_source_type: HashMap<([u8; 16], String), Vec<usize>>,
by_target_type: HashMap<([u8; 16], String), Vec<usize>>,
by_source: HashMap<[u8; 16], Vec<usize>>,
by_target: HashMap<[u8; 16], Vec<usize>>,
}
impl EdgeData {
fn new() -> Self {
Self {
records: Vec::new(),
next_id: 1,
by_source_type: HashMap::new(),
by_target_type: HashMap::new(),
by_source: HashMap::new(),
by_target: HashMap::new(),
}
}
}
#[derive(Clone)]
pub struct InMemoryEdgeStore {
data: Arc<RwLock<EdgeData>>,
}
impl InMemoryEdgeStore {
pub fn new() -> Self {
Self {
data: Arc::new(RwLock::new(EdgeData::new())),
}
}
pub fn insert(&self, edge: NewEdge) -> Result<i64, EdgeStoreError> {
if !VALID_EDGE_TYPES.contains(&edge.edge_type.as_str()) {
return Err(EdgeStoreError::InvalidEdgeType(edge.edge_type));
}
let mut data = self.data.write().expect("edge store lock poisoned");
let id = data.next_id;
data.next_id += 1;
let idx = data.records.len();
data.by_source_type
.entry((edge.source_agent_id, edge.edge_type.clone()))
.or_default()
.push(idx);
data.by_target_type
.entry((edge.target_agent_id, edge.edge_type.clone()))
.or_default()
.push(idx);
data.by_source.entry(edge.source_agent_id).or_default().push(idx);
data.by_target.entry(edge.target_agent_id).or_default().push(idx);
data.records.push(EdgeRecord {
id,
source_agent_id: edge.source_agent_id,
target_agent_id: edge.target_agent_id,
edge_type: edge.edge_type,
created_at: Utc::now(),
metadata: edge.metadata,
});
Ok(id)
}
pub fn list_outgoing(&self, source: [u8; 16], edge_type: Option<&str>, limit: usize) -> Vec<EdgeRecord> {
let limit = limit.min(1000);
let data = self.data.read().expect("edge store lock poisoned");
let idxs: &[usize] = match edge_type {
Some(et) => data
.by_source_type
.get(&(source, et.to_string()))
.map(Vec::as_slice)
.unwrap_or_default(),
None => data.by_source.get(&source).map(Vec::as_slice).unwrap_or_default(),
};
idxs.iter()
.rev()
.take(limit)
.map(|&i| data.records[i].clone())
.collect()
}
pub fn list_incoming(&self, target: [u8; 16], edge_type: Option<&str>, limit: usize) -> Vec<EdgeRecord> {
let limit = limit.min(1000);
let data = self.data.read().expect("edge store lock poisoned");
let idxs: &[usize] = match edge_type {
Some(et) => data
.by_target_type
.get(&(target, et.to_string()))
.map(Vec::as_slice)
.unwrap_or_default(),
None => data.by_target.get(&target).map(Vec::as_slice).unwrap_or_default(),
};
idxs.iter()
.rev()
.take(limit)
.map(|&i| data.records[i].clone())
.collect()
}
pub fn list_by_type(&self, edge_type: &str, since: DateTime<Utc>, limit: usize) -> Vec<EdgeRecord> {
let limit = limit.min(1000);
let data = self.data.read().expect("edge store lock poisoned");
data.records
.iter()
.filter(|r| r.edge_type == edge_type && r.created_at >= since)
.rev()
.take(limit)
.cloned()
.collect()
}
}
impl Default for InMemoryEdgeStore {
fn default() -> Self {
Self::new()
}
}