use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use grumpydb::{SharedDb, Value};
use uuid::Uuid;
use super::task::Task;
pub fn run_bench(db_path: &Path, writers: usize, readers: usize, count: usize) -> Result<(), String> {
let db = SharedDb::open(db_path).map_err(|e| format!("Failed to open: {e}"))?;
println!("Benchmark: {writers} writers × {count} inserts + {readers} readers");
println!("{}", "-".repeat(50));
let start = Instant::now();
let mut writer_handles = Vec::new();
for t in 0..writers {
let db = db.clone();
writer_handles.push(std::thread::spawn(move || {
let base = (t * count) as u128;
for i in 0..count {
let key = Uuid::from_u128(base + i as u128);
let value = Value::String(format!("bench_task_{t}_{i}"));
db.insert(key, value).unwrap();
}
}));
}
for h in writer_handles {
h.join().map_err(|_| "Writer thread panicked")?;
}
let write_elapsed = start.elapsed();
let total_writes = writers * count;
let write_ops_sec = total_writes as f64 / write_elapsed.as_secs_f64();
println!(" Writes: {total_writes} in {write_elapsed:.2?} ({write_ops_sec:.0} ops/sec)");
let start = Instant::now();
let total_keys = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut reader_handles = Vec::new();
for _ in 0..readers {
let db = db.clone();
let total_keys = total_keys.clone();
reader_handles.push(std::thread::spawn(move || {
let results = db.scan(..).unwrap();
total_keys.fetch_add(results.len(), std::sync::atomic::Ordering::Relaxed);
}));
}
for h in reader_handles {
h.join().map_err(|_| "Reader thread panicked")?;
}
let read_elapsed = start.elapsed();
let total_reads = total_keys.load(std::sync::atomic::Ordering::Relaxed);
let read_ops_sec = total_reads as f64 / read_elapsed.as_secs_f64();
println!(" Reads: {total_reads} docs across {readers} threads in {read_elapsed:.2?} ({read_ops_sec:.0} docs/sec)");
db.flush().map_err(|e| format!("Flush failed: {e}"))?;
db.close().map_err(|e| format!("Close failed: {e}"))?;
println!(" Done.");
Ok(())
}
pub fn run_server(db_path: &Path, addr: &str) -> Result<(), String> {
let db = SharedDb::open(db_path).map_err(|e| format!("Failed to open: {e}"))?;
let listener = TcpListener::bind(addr).map_err(|e| format!("Bind failed: {e}"))?;
println!("TaskMan server listening on {addr}");
println!("Connect with: nc {addr}");
println!("Commands: ADD <title> | GET <uuid> | LIST | DONE <uuid> | DELETE <uuid> | STATS | QUIT");
for stream in listener.incoming() {
let stream = stream.map_err(|e| format!("Accept failed: {e}"))?;
let peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
println!("Client connected: {peer}");
let db = db.clone();
std::thread::spawn(move || {
if let Err(e) = handle_client(stream, &db) {
eprintln!("Client {peer} error: {e}");
}
println!("Client disconnected: {peer}");
});
}
Ok(())
}
fn handle_client(
stream: std::net::TcpStream,
db: &SharedDb,
) -> Result<(), String> {
let reader = BufReader::new(stream.try_clone().map_err(|e| e.to_string())?);
let mut writer = stream;
for line in reader.lines() {
let line = line.map_err(|e| e.to_string())?;
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.splitn(2, ' ').collect();
let cmd = parts[0].to_uppercase();
let arg = parts.get(1).copied().unwrap_or("");
let response = match cmd.as_str() {
"ADD" => handle_add(db, arg),
"GET" => handle_get(db, arg),
"LIST" => handle_list(db),
"DONE" => handle_done(db, arg),
"DELETE" => handle_delete(db, arg),
"STATS" => handle_stats(db),
"QUIT" => {
let _ = writer.write_all(b"BYE\n");
return Ok(());
}
_ => format!("ERR unknown command: {cmd}\n"),
};
writer.write_all(response.as_bytes()).map_err(|e| e.to_string())?;
}
Ok(())
}
fn handle_add(db: &SharedDb, title: &str) -> String {
if title.is_empty() {
return "ERR missing title\n".to_string();
}
let task = Task::new(title, None, vec![]);
let id = task.id;
match db.insert(id, task.to_value()) {
Ok(()) => format!("OK {id}\n"),
Err(e) => format!("ERR {e}\n"),
}
}
fn handle_get(db: &SharedDb, id_str: &str) -> String {
let Ok(id) = Uuid::parse_str(id_str) else {
return format!("ERR invalid UUID: {id_str}\n");
};
match db.get(&id) {
Ok(Some(value)) => {
if let Some(task) = Task::from_value(id, &value) {
format!("OK {task}\n")
} else {
"ERR malformed task\n".to_string()
}
}
Ok(None) => "ERR not found\n".to_string(),
Err(e) => format!("ERR {e}\n"),
}
}
fn handle_list(db: &SharedDb) -> String {
match db.scan(..) {
Ok(entries) => {
let mut out = format!("OK {} tasks\n", entries.len());
for (key, value) in &entries {
if let Some(task) = Task::from_value(*key, value) {
out.push_str(&format!(" {task}\n"));
}
}
out
}
Err(e) => format!("ERR {e}\n"),
}
}
fn handle_done(db: &SharedDb, id_str: &str) -> String {
let Ok(id) = Uuid::parse_str(id_str) else {
return format!("ERR invalid UUID: {id_str}\n");
};
match db.get(&id) {
Ok(Some(value)) => {
if let Some(mut task) = Task::from_value(id, &value) {
task.done = true;
match db.update(&id, task.to_value()) {
Ok(()) => "OK done\n".to_string(),
Err(e) => format!("ERR {e}\n"),
}
} else {
"ERR malformed task\n".to_string()
}
}
Ok(None) => "ERR not found\n".to_string(),
Err(e) => format!("ERR {e}\n"),
}
}
fn handle_delete(db: &SharedDb, id_str: &str) -> String {
let Ok(id) = Uuid::parse_str(id_str) else {
return format!("ERR invalid UUID: {id_str}\n");
};
match db.delete(&id) {
Ok(()) => "OK deleted\n".to_string(),
Err(e) => format!("ERR {e}\n"),
}
}
fn handle_stats(db: &SharedDb) -> String {
match db.scan(..) {
Ok(entries) => {
let total = entries.len();
let done = entries
.iter()
.filter(|(k, v)| Task::from_value(*k, v).is_some_and(|t| t.done))
.count();
format!("OK total={total} done={done} pending={}\n", total - done)
}
Err(e) => format!("ERR {e}\n"),
}
}