use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, warn, Level};
use tracing_subscriber::FmtSubscriber;
use mindfry::persistence::{AkashicConfig, AkashicStore};
use mindfry::protocol::{CommandHandler, MfbpCodec, Request};
use mindfry::{MindFry, MindFryConfig};
const DEFAULT_PORT: u16 = 9527;
const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
struct ServerConfig {
host: String,
port: u16,
max_connections: usize,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".into(),
port: DEFAULT_PORT,
max_connections: 1024,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
println!();
println!(" ╔═══════════════════════════════════════════════════════════╗");
println!(" ║ Memory with a Conscience ║");
println!(" ║ COGNITIVE DB ENGINE ║");
println!(" ╚═══════════════════════════════════════════════════════════╝");
println!();
let server_config = ServerConfig::default();
println!(" ┌─ Initialization ─────────────────────────────────────────┐");
print!(" │ 📁 Mounting Akashic Records...");
std::io::Write::flush(&mut std::io::stdout())?;
let store_config = AkashicConfig::default();
let store = match AkashicStore::open(store_config) {
Ok(s) => {
println!(" ✓");
Arc::new(s)
}
Err(e) => {
println!(" ✗");
error!("Failed to open storage: {}", e);
return Err(e.into());
}
};
print!(" │ 🔍 Checking recovery state...");
std::io::Write::flush(&mut std::io::stdout())?;
let last_marker = store.read_shutdown_marker().ok().flatten();
let recovery_analyzer = mindfry::stability::RecoveryAnalyzer::new(last_marker);
let recovery_state = recovery_analyzer.analyze();
match recovery_state {
mindfry::stability::RecoveryState::Normal => println!(" ✓ (clean)"),
mindfry::stability::RecoveryState::Shock => {
println!(" ⚠ (SHOCK detected)");
warn!("🩹 Unclean shutdown detected - building resistance");
}
mindfry::stability::RecoveryState::Coma => {
println!(" ⚠ (COMA detected)");
warn!(
"😴 Prolonged downtime: {}s",
recovery_analyzer.downtime_secs()
);
}
}
print!(" │ 🧠 Initializing Psyche Arena...");
std::io::Write::flush(&mut std::io::stdout())?;
let db_config = MindFryConfig::default();
let db = MindFry::with_config(db_config).with_store(Arc::clone(&store));
println!(" ✓");
print!(" │ 🌐 Binding network interface...");
std::io::Write::flush(&mut std::io::stdout())?;
let addr = format!("{}:{}", server_config.host, server_config.port);
let listener = TcpListener::bind(&addr).await?;
println!(" ✓ ({})", addr);
let db = Arc::new(RwLock::new(db));
let warmup = mindfry::stability::WarmupTracker::new();
let has_snapshot = store
.list_snapshots()
.map(|s| !s.is_empty())
.unwrap_or(false);
if has_snapshot {
print!(" │ 🔄 Resurrection...");
std::io::Write::flush(&mut std::io::stdout())?;
warmup.begin_resurrection();
println!(" (async)");
let db_clone = Arc::clone(&db);
let warmup_clone = warmup.clone();
tokio::spawn(async move {
let start = std::time::Instant::now();
let result = {
let mut db = db_clone.write().unwrap();
db.resurrect()
};
match result {
Ok(true) => {
{
let mut db = db_clone.write().unwrap();
db.bootstrap_system_lineages();
}
info!("✅ Resurrection complete in {:?}", start.elapsed());
}
Ok(false) => {
info!("🌱 No snapshot found, genesis mode");
}
Err(e) => {
warn!("⚠️ Resurrection failed: {}", e);
}
}
warmup_clone.mark_ready();
});
} else {
print!(" │ 🌱 Genesis mode...");
std::io::Write::flush(&mut std::io::stdout())?;
{
let mut db = db.write().unwrap();
db.bootstrap_system_lineages();
}
println!(" ✓");
}
println!(" └────────────────────────────────────────────────────────────┘");
println!();
info!(
"Ready | {} lineages | {} bonds | max {} connections | warmup: {:?}",
db.read().unwrap().psyche.len(),
db.read().unwrap().bonds.len(),
server_config.max_connections,
warmup.state()
);
let shutdown_result = tokio::select! {
result = accept_loop(listener, Arc::clone(&db), warmup.clone()) => {
result
}
_ = tokio::signal::ctrl_c() => {
info!("🛑 Shutdown signal received (Ctrl+C)");
Ok(mindfry::stability::ShutdownReason::Signal { signal: 2 }) }
};
match shutdown_result {
Ok(reason) => {
info!("📝 Recording shutdown experience: {}", reason.description());
{
let db_guard = db.read().unwrap();
if let Some(ref store) = db_guard.store {
match store.take_snapshot(
Some("pre-shutdown"),
&db_guard.psyche,
&db_guard.strata,
&db_guard.bonds,
Some(&db_guard.cortex),
mindfry::persistence::PhysicsSnapshot::default(),
) {
Ok(meta) => info!("💾 Pre-shutdown snapshot saved: {}", meta.id),
Err(e) => warn!("⚠️ Failed to save shutdown snapshot: {}", e),
}
let marker = mindfry::stability::ShutdownMarker::graceful();
match store.write_shutdown_marker(&marker) {
Ok(_) => info!("✅ Graceful shutdown marker written"),
Err(e) => warn!("⚠️ Failed to write shutdown marker: {}", e),
}
}
}
info!("😴 MindFry going to sleep... Goodbye!");
}
Err(e) => {
error!("💔 Server error: {}", e);
}
}
Ok(())
}
async fn accept_loop(
listener: TcpListener,
db: Arc<RwLock<MindFry>>,
warmup: mindfry::stability::WarmupTracker,
) -> Result<mindfry::stability::ShutdownReason, Box<dyn std::error::Error + Send + Sync>> {
loop {
match listener.accept().await {
Ok((socket, peer)) => {
info!("📥 New connection from {}", peer);
let db_clone = Arc::clone(&db);
let warmup_clone = warmup.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(socket, db_clone, warmup_clone).await {
error!("Connection error: {}", e);
}
info!("📤 Connection closed: {}", peer);
});
}
Err(e) => {
error!("Accept error: {}", e);
return Err(e.into());
}
}
}
}
async fn handle_connection(
mut socket: TcpStream,
db: Arc<RwLock<MindFry>>,
warmup: mindfry::stability::WarmupTracker,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut handler = CommandHandler::with_warmup(db, warmup);
let mut buffer = vec![0u8; 4096];
let mut read_buf = Vec::new();
loop {
let n = socket.read(&mut buffer).await?;
if n == 0 {
return Ok(());
}
read_buf.extend_from_slice(&buffer[..n]);
while read_buf.len() >= 5 {
let frame_len =
u32::from_le_bytes([read_buf[0], read_buf[1], read_buf[2], read_buf[3]]) as usize;
if frame_len > MAX_FRAME_SIZE {
warn!("Frame too large: {} bytes", frame_len);
return Err("Frame too large".into());
}
let total_len = 4 + frame_len;
if read_buf.len() < total_len {
break;
}
let frame: Vec<u8> = read_buf.drain(..total_len).collect();
match MfbpCodec::decode_request(&frame) {
Ok(request) => {
log_request(&request);
let response = handler.handle(request);
let response_bytes = MfbpCodec::encode_response(&response);
socket.write_all(&response_bytes).await?;
}
Err(e) => {
warn!("Failed to decode request: {}", e);
let error_response = mindfry::protocol::Response::Error {
code: mindfry::protocol::ErrorCode::MalformedPayload,
message: format!("Failed to decode: {}", e),
};
let response_bytes = MfbpCodec::encode_response(&error_response);
socket.write_all(&response_bytes).await?;
}
}
}
}
}
fn log_request(request: &Request) {
match request {
Request::Ping => info!(" → PING"),
Request::Stats => info!(" → STATS"),
Request::LineageCreate { id, .. } => info!(" → LINEAGE.CREATE '{}'", id),
Request::LineageGet { id, flags } => {
info!(" → LINEAGE.GET '{}' [flags:0x{:02X}]", id, flags)
}
Request::LineageStimulate { id, delta, flags } => {
info!(
" → LINEAGE.STIMULATE '{}' +{} [flags:0x{:02X}]",
id, delta, flags
)
}
Request::LineageForget { id } => info!(" → LINEAGE.FORGET '{}'", id),
Request::BondConnect { source, target, .. } => {
info!(" → BOND.CONNECT '{}' ↔ '{}'", source, target)
}
Request::QueryConscious { .. } => info!(" → QUERY.CONSCIOUS"),
Request::QueryTopK { k } => info!(" → QUERY.TOP_K({})", k),
Request::QueryTrauma { min_rigidity } => info!(" → QUERY.TRAUMA(≥{})", min_rigidity),
Request::Snapshot { name } => info!(" → SYS.SNAPSHOT '{}'", name),
Request::Freeze { frozen } => {
info!(" → SYS.{}", if *frozen { "FREEZE" } else { "THAW" })
}
_ => info!(" → {:?}", request.opcode()),
}
}