use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info, warn, Level};
use tracing_subscriber::FmtSubscriber;
use mindfry::persistence::{AkashicConfig, AkashicStore};
use mindfry::protocol::{CommandHandler, MfbpCodec, Request};
use mindfry::{MindFry, MindFryConfig};
const DEFAULT_PORT: u16 = 9527;
const MAX_FRAME_SIZE: usize = 16 * 1024 * 1024;
struct ServerConfig {
host: String,
port: u16,
max_connections: usize,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".into(),
port: DEFAULT_PORT,
max_connections: 1024,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber)?;
println!();
println!(" ╔═══════════════════════════════════════════════════════════╗");
println!(" ║ 🧠🔥 MindFry - The World's First Ephemeral Graph Database ║");
println!(" ║ COGNITIVE DB ENGINE ║");
println!(" ╚═══════════════════════════════════════════════════════════╝");
println!();
let server_config = ServerConfig::default();
let store_config = AkashicConfig::default();
let store = match AkashicStore::open(store_config) {
Ok(s) => {
info!("💾 Storage mounted: ./mindfry_data");
Arc::new(s)
}
Err(e) => {
error!("Failed to open storage: {}", e);
return Err(e.into());
}
};
let db_config = MindFryConfig::default();
let mut db = MindFry::with_config(db_config).with_store(Arc::clone(&store));
match db.resurrect() {
Ok(true) => info!("🧬 Resurrection successful"),
Ok(false) => info!("🌱 Genesis mode: Starting fresh"),
Err(e) => {
warn!("⚠️ Resurrection failed: {}. Starting fresh.", e);
}
}
let db = Arc::new(RwLock::new(db));
info!(
"Psyche Arena capacity: {} lineages",
db.read().unwrap().psyche.capacity()
);
info!(
"Bond Graph capacity: {} bonds",
db.read().unwrap().bonds.len()
);
let addr = format!("{}:{}", server_config.host, server_config.port);
let listener = TcpListener::bind(&addr).await?;
info!("🌐 MFBP Server listening on {}", addr);
info!(
"Ready to accept connections (max: {})",
server_config.max_connections
);
loop {
match listener.accept().await {
Ok((socket, peer)) => {
info!("📥 New connection from {}", peer);
let db_clone = Arc::clone(&db);
tokio::spawn(async move {
if let Err(e) = handle_connection(socket, db_clone).await {
error!("Connection error: {}", e);
}
info!("📤 Connection closed: {}", peer);
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}
async fn handle_connection(
mut socket: TcpStream,
db: Arc<RwLock<MindFry>>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut handler = CommandHandler::new(db);
let mut buffer = vec![0u8; 4096];
let mut read_buf = Vec::new();
loop {
let n = socket.read(&mut buffer).await?;
if n == 0 {
return Ok(());
}
read_buf.extend_from_slice(&buffer[..n]);
while read_buf.len() >= 5 {
let frame_len =
u32::from_le_bytes([read_buf[0], read_buf[1], read_buf[2], read_buf[3]]) as usize;
if frame_len > MAX_FRAME_SIZE {
warn!("Frame too large: {} bytes", frame_len);
return Err("Frame too large".into());
}
let total_len = 4 + frame_len;
if read_buf.len() < total_len {
break;
}
let frame: Vec<u8> = read_buf.drain(..total_len).collect();
match MfbpCodec::decode_request(&frame) {
Ok(request) => {
log_request(&request);
let response = handler.handle(request);
let response_bytes = MfbpCodec::encode_response(&response);
socket.write_all(&response_bytes).await?;
}
Err(e) => {
warn!("Failed to decode request: {}", e);
let error_response = mindfry::protocol::Response::Error {
code: mindfry::protocol::ErrorCode::MalformedPayload,
message: format!("Failed to decode: {}", e),
};
let response_bytes = MfbpCodec::encode_response(&error_response);
socket.write_all(&response_bytes).await?;
}
}
}
}
}
fn log_request(request: &Request) {
match request {
Request::Ping => info!(" → PING"),
Request::Stats => info!(" → STATS"),
Request::LineageCreate { id, .. } => info!(" → LINEAGE.CREATE '{}'", id),
Request::LineageGet { id, flags } => {
info!(" → LINEAGE.GET '{}' [flags:0x{:02X}]", id, flags)
}
Request::LineageStimulate { id, delta, flags } => {
info!(
" → LINEAGE.STIMULATE '{}' +{} [flags:0x{:02X}]",
id, delta, flags
)
}
Request::LineageForget { id } => info!(" → LINEAGE.FORGET '{}'", id),
Request::BondConnect { source, target, .. } => {
info!(" → BOND.CONNECT '{}' ↔ '{}'", source, target)
}
Request::QueryConscious { .. } => info!(" → QUERY.CONSCIOUS"),
Request::QueryTopK { k } => info!(" → QUERY.TOP_K({})", k),
Request::QueryTrauma { min_rigidity } => info!(" → QUERY.TRAUMA(≥{})", min_rigidity),
Request::Snapshot { name } => info!(" → SYS.SNAPSHOT '{}'", name),
Request::Freeze { frozen } => {
info!(" → SYS.{}", if *frozen { "FREEZE" } else { "THAW" })
}
_ => info!(" → {:?}", request.opcode()),
}
}