Skip to main content

docker_etcd/
lib.rs

1//! Docker etcd
2//!
3//! Rust implementation of etcd server, providing distributed key-value storage
4
5#![warn(missing_docs)]
6
7use docker_types::DockerError;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use tokio::sync::broadcast;
11
12/// 结果类型
13pub type Result<T> = std::result::Result<T, DockerError>;
14
15/// 键值对
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct KeyValue {
18    pub key: Vec<u8>,
19    pub value: Vec<u8>,
20    pub create_revision: i64,
21    pub mod_revision: i64,
22    pub version: i64,
23    pub lease: i64,
24}
25
26/// 存储引擎
27pub struct Storage {
28    data: RwLock<HashMap<Vec<u8>, KeyValue>>,
29    revision: RwLock<i64>,
30    tx: broadcast::Sender<Event>,
31}
32
33/// 事件类型
34#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub enum EventType {
36    Put,
37    Delete,
38}
39
40/// 事件
41#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct Event {
43    pub r#type: EventType,
44    pub kv: KeyValue,
45    pub prev_kv: Option<KeyValue>,
46}
47
48impl Storage {
49    /// 创建新的存储引擎
50    pub fn new() -> Self {
51        let (tx, _) = broadcast::channel(100);
52        Self {
53            data: RwLock::new(HashMap::new()),
54            revision: RwLock::new(1),
55            tx,
56        }
57    }
58
59    /// 获取值
60    pub fn get(&self, key: &[u8]) -> Result<KeyValue> {
61        let data = self.data.read().unwrap();
62        data.get(key)
63            .cloned()
64            .ok_or(DockerError::not_found("key", "not found"))
65    }
66
67    /// 设置值
68    pub fn put(&self, key: Vec<u8>, value: Vec<u8>, lease: i64) -> Result<KeyValue> {
69        let mut data = self.data.write().unwrap();
70        let mut revision = self.revision.write().unwrap();
71
72        *revision += 1;
73        let new_revision = *revision;
74
75        let prev_kv = data.get(&key).cloned();
76
77        let kv = KeyValue {
78            key: key.clone(),
79            value,
80            create_revision: prev_kv
81                .as_ref()
82                .map(|kv| kv.create_revision)
83                .unwrap_or(new_revision),
84            mod_revision: new_revision,
85            version: prev_kv.as_ref().map(|kv| kv.version + 1).unwrap_or(1),
86            lease,
87        };
88
89        data.insert(key, kv.clone());
90
91        // 发送事件
92        let event = Event {
93            r#type: EventType::Put,
94            kv: kv.clone(),
95            prev_kv,
96        };
97        self.tx.send(event).ok();
98
99        Ok(kv)
100    }
101
102    /// 删除值
103    pub fn delete(&self, key: &[u8]) -> Result<Option<KeyValue>> {
104        let mut data = self.data.write().unwrap();
105        let prev_kv = data.remove(key);
106
107        if let Some(kv) = &prev_kv {
108            let mut revision = self.revision.write().unwrap();
109            *revision += 1;
110
111            // 发送事件
112            let event = Event {
113                r#type: EventType::Delete,
114                kv: kv.clone(),
115                prev_kv: None,
116            };
117            self.tx.send(event).ok();
118        }
119
120        Ok(prev_kv)
121    }
122
123    /// 列出所有键
124    pub fn list(&self) -> Vec<KeyValue> {
125        let data = self.data.read().unwrap();
126        data.values().cloned().collect()
127    }
128
129    /// 获取事件订阅
130    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
131        self.tx.subscribe()
132    }
133
134    /// 获取当前版本
135    pub fn get_revision(&self) -> i64 {
136        *self.revision.read().unwrap()
137    }
138}
139
140/// Etcd服务器
141pub struct EtcdServer {
142    storage: Arc<Storage>,
143    address: String,
144}
145
146impl EtcdServer {
147    /// 创建新的etcd服务器
148    pub fn new(address: String) -> Self {
149        Self {
150            storage: Arc::new(Storage::new()),
151            address,
152        }
153    }
154
155    /// 获取存储引擎
156    pub fn storage(&self) -> Arc<Storage> {
157        self.storage.clone()
158    }
159
160    /// 启动服务器
161    pub async fn start(&self) -> Result<()> {
162        // 这里将实现HTTP/gRPC服务器
163        println!("Starting etcd server on {}", self.address);
164        Ok(())
165    }
166}