use aingle_graph::GraphDB;
use aingle_logic::RuleEngine;
use std::path::Path;
use std::sync::Arc;
use ineru::IneruMemory;
use tokio::sync::RwLock;
#[cfg(feature = "auth")]
use crate::auth::UserStore;
use crate::proofs::ProofStore;
use crate::rest::audit::AuditLog;
#[derive(Clone)]
pub struct AppState {
pub graph: Arc<RwLock<GraphDB>>,
pub logic: Arc<RwLock<RuleEngine>>,
pub memory: Arc<RwLock<IneruMemory>>,
pub broadcaster: Arc<EventBroadcaster>,
pub proof_store: Arc<ProofStore>,
pub sandbox_manager: Arc<SandboxManager>,
pub audit_log: Arc<RwLock<AuditLog>>,
#[cfg(feature = "auth")]
pub user_store: Arc<UserStore>,
#[cfg(feature = "p2p")]
pub p2p: Option<Arc<crate::p2p::manager::P2pManager>>,
#[cfg(feature = "cluster")]
pub wal: Option<Arc<aingle_wal::WalWriter>>,
#[cfg(feature = "cluster")]
pub raft: Option<openraft::Raft<aingle_raft::CortexTypeConfig, std::sync::Arc<aingle_raft::state_machine::CortexStateMachine>>>,
#[cfg(feature = "cluster")]
pub cluster_node_id: Option<u64>,
#[cfg(feature = "cluster")]
pub cluster_secret: Option<String>,
#[cfg(feature = "cluster")]
pub tls_server_config: Option<Arc<rustls::ServerConfig>>,
#[cfg(feature = "dag")]
pub dag_author: Option<aingle_graph::NodeId>,
#[cfg(feature = "dag")]
pub dag_seq_counter: std::sync::Arc<std::sync::atomic::AtomicU64>,
#[cfg(feature = "dag")]
pub dag_signing_key: Option<std::sync::Arc<aingle_graph::dag::DagSigningKey>>,
}
impl AppState {
pub fn new() -> crate::error::Result<Self> {
let graph = GraphDB::memory()?;
let logic = RuleEngine::new();
let memory = IneruMemory::agent_mode();
#[cfg(feature = "auth")]
let user_store = {
let store = Arc::new(UserStore::new());
let _ = store.init_default_admin();
store
};
Ok(Self {
graph: Arc::new(RwLock::new(graph)),
logic: Arc::new(RwLock::new(logic)),
memory: Arc::new(RwLock::new(memory)),
broadcaster: Arc::new(EventBroadcaster::new()),
proof_store: Arc::new(ProofStore::new()),
sandbox_manager: Arc::new(SandboxManager::new()),
audit_log: Arc::new(RwLock::new(AuditLog::default())),
#[cfg(feature = "auth")]
user_store,
#[cfg(feature = "p2p")]
p2p: None,
#[cfg(feature = "cluster")]
wal: None,
#[cfg(feature = "cluster")]
raft: None,
#[cfg(feature = "cluster")]
cluster_node_id: None,
#[cfg(feature = "cluster")]
cluster_secret: None,
#[cfg(feature = "cluster")]
tls_server_config: None,
#[cfg(feature = "dag")]
dag_author: None,
#[cfg(feature = "dag")]
dag_seq_counter: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
#[cfg(feature = "dag")]
dag_signing_key: None,
})
}
pub fn with_graph(graph: GraphDB) -> Self {
let logic = RuleEngine::new();
let memory = IneruMemory::agent_mode();
#[cfg(feature = "auth")]
let user_store = {
let store = Arc::new(UserStore::new());
let _ = store.init_default_admin();
store
};
Self {
graph: Arc::new(RwLock::new(graph)),
logic: Arc::new(RwLock::new(logic)),
memory: Arc::new(RwLock::new(memory)),
broadcaster: Arc::new(EventBroadcaster::new()),
proof_store: Arc::new(ProofStore::new()),
sandbox_manager: Arc::new(SandboxManager::new()),
audit_log: Arc::new(RwLock::new(AuditLog::default())),
#[cfg(feature = "auth")]
user_store,
#[cfg(feature = "p2p")]
p2p: None,
#[cfg(feature = "cluster")]
wal: None,
#[cfg(feature = "cluster")]
raft: None,
#[cfg(feature = "cluster")]
cluster_node_id: None,
#[cfg(feature = "cluster")]
cluster_secret: None,
#[cfg(feature = "cluster")]
tls_server_config: None,
#[cfg(feature = "dag")]
dag_author: None,
#[cfg(feature = "dag")]
dag_seq_counter: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
#[cfg(feature = "dag")]
dag_signing_key: None,
}
}
pub fn with_audit_path(path: std::path::PathBuf) -> crate::error::Result<Self> {
let graph = GraphDB::memory()?;
let logic = RuleEngine::new();
let memory = IneruMemory::agent_mode();
#[cfg(feature = "auth")]
let user_store = {
let store = Arc::new(UserStore::new());
let _ = store.init_default_admin();
store
};
Ok(Self {
graph: Arc::new(RwLock::new(graph)),
logic: Arc::new(RwLock::new(logic)),
memory: Arc::new(RwLock::new(memory)),
broadcaster: Arc::new(EventBroadcaster::new()),
proof_store: Arc::new(ProofStore::new()),
sandbox_manager: Arc::new(SandboxManager::new()),
audit_log: Arc::new(RwLock::new(AuditLog::with_path(10_000, path))),
#[cfg(feature = "auth")]
user_store,
#[cfg(feature = "p2p")]
p2p: None,
#[cfg(feature = "cluster")]
wal: None,
#[cfg(feature = "cluster")]
raft: None,
#[cfg(feature = "cluster")]
cluster_node_id: None,
#[cfg(feature = "cluster")]
cluster_secret: None,
#[cfg(feature = "cluster")]
tls_server_config: None,
#[cfg(feature = "dag")]
dag_author: None,
#[cfg(feature = "dag")]
dag_seq_counter: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
#[cfg(feature = "dag")]
dag_signing_key: None,
})
}
pub fn with_db_path(
db_path: &str,
audit_log_path: Option<std::path::PathBuf>,
) -> crate::error::Result<Self> {
let graph = if db_path == ":memory:" {
GraphDB::memory()?
} else {
if let Some(parent) = Path::new(db_path).parent() {
std::fs::create_dir_all(parent).ok();
}
GraphDB::sled(db_path)?
};
let logic = RuleEngine::new();
let memory = if db_path != ":memory:" {
let snapshot_path = Path::new(db_path)
.parent()
.unwrap_or(Path::new("."))
.join("ineru.snapshot");
if snapshot_path.exists() {
match IneruMemory::load_from_file(&snapshot_path) {
Ok(mem) => {
log::info!("Loaded Ineru snapshot from {}", snapshot_path.display());
mem
}
Err(e) => {
log::warn!("Failed to load Ineru snapshot: {}. Starting fresh.", e);
IneruMemory::agent_mode()
}
}
} else {
IneruMemory::agent_mode()
}
} else {
IneruMemory::agent_mode()
};
let audit_log = if let Some(path) = audit_log_path {
AuditLog::with_path(10_000, path)
} else {
AuditLog::default()
};
let proof_store = if db_path != ":memory:" {
let proof_db_path = Path::new(db_path)
.parent()
.unwrap_or(Path::new("."))
.join("proofs.sled");
let proof_db_str = proof_db_path.to_string_lossy();
match ProofStore::with_sled(&proof_db_str) {
Ok(ps) => {
log::info!("ProofStore using Sled backend at {}", proof_db_str);
Arc::new(ps)
}
Err(e) => {
log::warn!("Failed to open Sled ProofStore: {}. Falling back to in-memory.", e);
Arc::new(ProofStore::new())
}
}
} else {
Arc::new(ProofStore::new())
};
#[cfg(feature = "auth")]
let user_store = {
let store = Arc::new(UserStore::new());
let _ = store.init_default_admin();
store
};
Ok(Self {
graph: Arc::new(RwLock::new(graph)),
logic: Arc::new(RwLock::new(logic)),
memory: Arc::new(RwLock::new(memory)),
broadcaster: Arc::new(EventBroadcaster::new()),
proof_store,
sandbox_manager: Arc::new(SandboxManager::new()),
audit_log: Arc::new(RwLock::new(audit_log)),
#[cfg(feature = "auth")]
user_store,
#[cfg(feature = "p2p")]
p2p: None,
#[cfg(feature = "cluster")]
wal: None,
#[cfg(feature = "cluster")]
raft: None,
#[cfg(feature = "cluster")]
cluster_node_id: None,
#[cfg(feature = "cluster")]
cluster_secret: None,
#[cfg(feature = "cluster")]
tls_server_config: None,
#[cfg(feature = "dag")]
dag_author: None,
#[cfg(feature = "dag")]
dag_seq_counter: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(1)),
#[cfg(feature = "dag")]
dag_signing_key: None,
})
}
pub async fn flush(&self, snapshot_dir: Option<&Path>) -> crate::error::Result<()> {
{
let graph = self.graph.read().await;
graph.flush()?;
}
if let Err(e) = self.proof_store.flush() {
log::warn!("Failed to flush proof store: {}", e);
}
if let Some(dir) = snapshot_dir {
let snapshot_path = dir.join("ineru.snapshot");
let memory = self.memory.read().await;
if let Err(e) = memory.save_to_file(&snapshot_path) {
log::warn!("Failed to save Ineru snapshot: {}", e);
} else {
log::info!("Ineru snapshot saved to {}", snapshot_path.display());
}
}
Ok(())
}
pub fn cortex_client(&self) -> crate::client::CortexInternalClient {
crate::client::CortexInternalClient::default_client()
}
pub async fn stats(&self) -> GraphStats {
let graph = self.graph.read().await;
let stats = graph.stats();
GraphStats {
triple_count: stats.triple_count,
subject_count: stats.subject_count,
predicate_count: stats.predicate_count,
object_count: stats.object_count,
connected_clients: self.broadcaster.client_count(),
}
}
}
impl Default for AppState {
fn default() -> Self {
Self::new().expect("Failed to create default AppState with in-memory graph")
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct GraphStats {
pub triple_count: usize,
pub subject_count: usize,
pub predicate_count: usize,
pub object_count: usize,
pub connected_clients: usize,
}
pub struct EventBroadcaster {
sender: tokio::sync::broadcast::Sender<Event>,
client_count: std::sync::atomic::AtomicUsize,
}
impl EventBroadcaster {
pub fn new() -> Self {
let (sender, _) = tokio::sync::broadcast::channel(1024);
Self {
sender,
client_count: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Event> {
self.client_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.sender.subscribe()
}
pub fn unsubscribe(&self) {
let _ = self.client_count.fetch_update(
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
|current| current.checked_sub(1),
);
}
pub fn broadcast(&self, event: Event) {
let _ = self.sender.send(event);
}
pub fn client_count(&self) -> usize {
self.client_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Default for EventBroadcaster {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "type", content = "data")]
pub enum Event {
TripleAdded {
hash: String,
subject: String,
predicate: String,
object: serde_json::Value,
},
TripleDeleted { hash: String },
ValidationCompleted {
hash: String,
valid: bool,
proof_hash: Option<String>,
},
Connected { client_id: String },
Ping,
}
impl Event {
pub fn to_json(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
}
struct SandboxEntry {
namespace: String,
created_at: std::time::Instant,
ttl: std::time::Duration,
}
pub struct SandboxManager {
entries: RwLock<std::collections::HashMap<String, SandboxEntry>>,
}
impl SandboxManager {
pub fn new() -> Self {
Self {
entries: RwLock::new(std::collections::HashMap::new()),
}
}
pub async fn create(&self, id: String, namespace: String, ttl_seconds: u64) {
let entry = SandboxEntry {
namespace,
created_at: std::time::Instant::now(),
ttl: std::time::Duration::from_secs(ttl_seconds),
};
let mut entries = self.entries.write().await;
entries.insert(id, entry);
}
pub async fn remove(&self, id: &str) -> Option<String> {
let mut entries = self.entries.write().await;
entries.remove(id).map(|e| e.namespace)
}
pub async fn get(&self, id: &str) -> Option<String> {
let entries = self.entries.read().await;
entries.get(id).and_then(|e| {
if e.created_at.elapsed() < e.ttl {
Some(e.namespace.clone())
} else {
None
}
})
}
pub async fn expired(&self) -> Vec<String> {
let entries = self.entries.read().await;
entries
.iter()
.filter(|(_, e)| e.created_at.elapsed() >= e.ttl)
.map(|(id, _)| id.clone())
.collect()
}
}
impl Default for SandboxManager {
fn default() -> Self {
Self::new()
}
}