minkv 0.3.0

一个轻量级持久化键值存储,支持内存和文件后端,提供 CLI 和 TCP 服务器
Documentation
//! TCP 键值存储服务器。
//!
//! 使用线程池、`RwLock` 和 `BufWriter` 提供高性能并发服务。
//! 支持 `get`、`set`、`remove`、`scan` 命令,数据持久化到 `store.json`。
//!
//! # 启动
//! ```bash
//! cargo run --bin server
//! # 或者
//! minkv-server
//! ```
//!
//! # 协议
//! 每行发送一个命令:
//! - `GET <key>`
//! - `SET <key> <value>`
//! - `REMOVE <key>`
//! - `SCAN <prefix>`
//!
//! 返回值:
//! - `OK` 表示操作成功
//! - `错误:...` 表示错误
//! - 对于 `GET`:直接返回键值
//! - 对于 `SCAN`:先返回多行 `key value`,最后一行 `OK`
//!
//! 示例(telnet):
//! ```text
//! SET name Rust
//! OK
//! GET name
//! Rust
//! SCAN n
//! name Rust
//! OK
//! ```

use std::io::{BufRead, BufReader, BufWriter, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{self, Receiver};
use std::thread;
use minkv::{KvStore, FileStorage};

const SERVER_ADDR: &str = "127.0.0.1:8080";
const STORE_PATH: &str = "store.json";
const NUM_WORKERS: usize = 4;

fn main() {
    let listener = TcpListener::bind(SERVER_ADDR).expect("无法绑定端口");
    println!(
        "服务器已启动 (线程池大小: {}),监听地址: {}",
        NUM_WORKERS, SERVER_ADDR
    );
    println!(
        "数据文件保存路径: {}",
        std::env::current_dir().unwrap().join(STORE_PATH).display()
    );

    let store = Arc::new(RwLock::new(KvStore::<FileStorage>::open(STORE_PATH)));
    let store_path = {
        let guard = store.read().unwrap();
        guard.path().to_path_buf()
    };

    // Receiver 不支持 Sync,使用 Mutex 替代 RwLock
    let (tx, rx) = mpsc::channel::<TcpStream>();
    let rx = Arc::new(Mutex::new(rx));

    let mut workers = vec![];
    for id in 0..NUM_WORKERS {
        let store_clone = Arc::clone(&store);
        let rx_clone = Arc::clone(&rx);
        let path_clone = store_path.clone();
        workers.push(thread::spawn(move || {
            worker_loop(id, store_clone, rx_clone, path_clone)
        }));
    }

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                if let Err(e) = tx.send(stream) {
                    eprintln!("发送连接到工作线程失败: {}", e);
                    break;
                }
            }
            Err(e) => eprintln!("接受连接失败: {}", e),
        }
    }

    for handle in workers {
        handle.join().expect("工作线程崩溃");
    }
}

/// 工作线程主循环:从共享接收端获取新连接并处理。
fn worker_loop(
    worker_id: usize,
    store: Arc<RwLock<KvStore<FileStorage>>>,
    rx: Arc<Mutex<Receiver<TcpStream>>>,
    store_path: std::path::PathBuf,
) {
    println!("[工作线程 {}] 启动", worker_id);
    loop {
        let stream = {
            let receiver = rx.lock().unwrap();
            match receiver.recv() {
                Ok(stream) => stream,
                Err(_) => {
                    println!("[工作线程 {}] 通道关闭,退出", worker_id);
                    break;
                }
            }
        };
        if let Err(e) = handle_client(worker_id, stream, store.clone(), &store_path) {
            eprintln!("[工作线程 {}] 处理客户端出错: {}", worker_id, e);
        }
    }
    println!("[工作线程 {}] 停止", worker_id);
}

/// 处理一个客户端连接:逐行解析命令,执行操作并返回结果。
///
/// # 优化
/// - 使用 `BufWriter` 批量写回,减少系统调用。
/// - 写操作先在写锁内修改内存数据,克隆快照后释放锁,再执行文件 I/O。
/// - 读操作仅持有读锁。
fn handle_client(
    worker_id: usize,
    mut stream: TcpStream,
    store: Arc<RwLock<KvStore<FileStorage>>>,
    store_path: &std::path::Path,
) -> std::io::Result<()> {
    let peer_addr = stream.peer_addr()?;
    println!("[工作线程 {}] 新客户端连接: {}", worker_id, peer_addr);

    let mut writer = BufWriter::new(stream.try_clone()?);
    let reader = BufReader::new(&mut stream);

    let result = reader.lines().try_for_each(|line_result| -> std::io::Result<()> {
        let line = line_result?;
        let trimmed = line.trim();
        if trimmed.is_empty() {
            return Ok(());
        }

        println!("[工作线程 {}] 收到命令: {}", worker_id, trimmed);
        let parts: Vec<&str> = trimmed.split_whitespace().collect();
        if parts.is_empty() {
            return Ok(());
        }

        let response = match parts[0].to_lowercase().as_str() {
            "get" => {
                if parts.len() != 2 {
                    "错误:用法 GET <key>".to_string()
                } else {
                    let key = parts[1];
                    let s = store.read().unwrap();
                    match s.get(key) {
                        Some(v) => v.to_owned(),
                        None => format!("错误:键 '{}' 不存在", key),
                    }
                }
            }
            "set" => {
                if parts.len() != 3 {
                    "错误:用法 SET <key> <value>".to_string()
                } else {
                    let key = parts[1].to_string();
                    let value = parts[2].to_string();
                    // 写锁内修改并克隆快照
                    let snapshot = {
                        let mut s = store.write().unwrap();
                        s.set(key, value);
                        s.clone_data()
                    };
                    // 无锁持久化
                    match FileStorage::save_from_snapshot(&snapshot, store_path) {
                        Ok(()) => {
                            println!("[工作线程 {}] SET 持久化成功", worker_id);
                            "OK".to_string()
                        }
                        Err(e) => format!("错误:保存数据失败 - {}", e),
                    }
                }
            }
            "remove" => {
                if parts.len() != 2 {
                    "错误:用法 REMOVE <key>".to_string()
                } else {
                    let key = parts[1];
                    let (removed, snapshot) = {
                        let mut s = store.write().unwrap();
                        let removed = s.remove(key).is_some();
                        if removed {
                            (true, Some(s.clone_data()))
                        } else {
                            (false, None)
                        }
                    };
                    if removed {
                        let data = snapshot.unwrap();
                        match FileStorage::save_from_snapshot(&data, store_path) {
                            Ok(()) => "OK".to_string(),
                            Err(e) => format!("错误:保存数据失败 - {}", e),
                        }
                    } else {
                        format!("错误:键 '{}' 不存在", key)
                    }
                }
            }
            "scan" => {
                if parts.len() != 2 {
                    "错误:用法 SCAN <prefix>".to_string()
                } else {
                    let prefix = parts[1];
                    // 收集结果,尽早释放读锁
                    let items: Vec<(String, String)> = store
                        .read()
                        .unwrap()
                        .scan(prefix)
                        .collect();
                    for (k, v) in items {
                        writeln!(writer, "{} {}", k, v)?;
                    }
                    writer.flush()?;
                    "OK".to_string()
                }
            }
            _ => format!("错误:未知命令 '{}'", parts[0]),
        };

        writeln!(writer, "{}", response)?;
        writer.flush()?;
        Ok(())
    });

    if let Err(e) = result {
        eprintln!("[工作线程 {}] 读取命令流错误: {}", worker_id, e);
        return Err(e);
    }

    println!(
        "[工作线程 {}] 客户端 {} 断开连接",
        worker_id, peer_addr
    );
    Ok(())
}