1#![warn(missing_docs)]
6
7use docker_types::DockerError;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10use tokio::sync::broadcast;
11
12pub type Result<T> = std::result::Result<T, DockerError>;
14
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct KeyValue {
18 pub key: Vec<u8>,
19 pub value: Vec<u8>,
20 pub create_revision: i64,
21 pub mod_revision: i64,
22 pub version: i64,
23 pub lease: i64,
24}
25
26pub struct Storage {
28 data: RwLock<HashMap<Vec<u8>, KeyValue>>,
29 revision: RwLock<i64>,
30 tx: broadcast::Sender<Event>,
31}
32
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
35pub enum EventType {
36 Put,
37 Delete,
38}
39
40#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42pub struct Event {
43 pub r#type: EventType,
44 pub kv: KeyValue,
45 pub prev_kv: Option<KeyValue>,
46}
47
48impl Storage {
49 pub fn new() -> Self {
51 let (tx, _) = broadcast::channel(100);
52 Self {
53 data: RwLock::new(HashMap::new()),
54 revision: RwLock::new(1),
55 tx,
56 }
57 }
58
59 pub fn get(&self, key: &[u8]) -> Result<KeyValue> {
61 let data = self.data.read().unwrap();
62 data.get(key)
63 .cloned()
64 .ok_or(DockerError::not_found("key", "not found"))
65 }
66
67 pub fn put(&self, key: Vec<u8>, value: Vec<u8>, lease: i64) -> Result<KeyValue> {
69 let mut data = self.data.write().unwrap();
70 let mut revision = self.revision.write().unwrap();
71
72 *revision += 1;
73 let new_revision = *revision;
74
75 let prev_kv = data.get(&key).cloned();
76
77 let kv = KeyValue {
78 key: key.clone(),
79 value,
80 create_revision: prev_kv
81 .as_ref()
82 .map(|kv| kv.create_revision)
83 .unwrap_or(new_revision),
84 mod_revision: new_revision,
85 version: prev_kv.as_ref().map(|kv| kv.version + 1).unwrap_or(1),
86 lease,
87 };
88
89 data.insert(key, kv.clone());
90
91 let event = Event {
93 r#type: EventType::Put,
94 kv: kv.clone(),
95 prev_kv,
96 };
97 self.tx.send(event).ok();
98
99 Ok(kv)
100 }
101
102 pub fn delete(&self, key: &[u8]) -> Result<Option<KeyValue>> {
104 let mut data = self.data.write().unwrap();
105 let prev_kv = data.remove(key);
106
107 if let Some(kv) = &prev_kv {
108 let mut revision = self.revision.write().unwrap();
109 *revision += 1;
110
111 let event = Event {
113 r#type: EventType::Delete,
114 kv: kv.clone(),
115 prev_kv: None,
116 };
117 self.tx.send(event).ok();
118 }
119
120 Ok(prev_kv)
121 }
122
123 pub fn list(&self) -> Vec<KeyValue> {
125 let data = self.data.read().unwrap();
126 data.values().cloned().collect()
127 }
128
129 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
131 self.tx.subscribe()
132 }
133
134 pub fn get_revision(&self) -> i64 {
136 *self.revision.read().unwrap()
137 }
138}
139
140pub struct EtcdServer {
142 storage: Arc<Storage>,
143 address: String,
144}
145
146impl EtcdServer {
147 pub fn new(address: String) -> Self {
149 Self {
150 storage: Arc::new(Storage::new()),
151 address,
152 }
153 }
154
155 pub fn storage(&self) -> Arc<Storage> {
157 self.storage.clone()
158 }
159
160 pub async fn start(&self) -> Result<()> {
162 println!("Starting etcd server on {}", self.address);
164 Ok(())
165 }
166}