1use serde::{Deserialize, Serialize};
2
3use lmdb::Cursor;
4use lmdb::Transaction;
5
6#[derive(Debug, PartialEq)]
7struct Event {
8 data: String,
9 event: Option<String>,
10 id: Option<i64>,
11}
12
13#[derive(PartialEq, Debug, Serialize, Deserialize)]
14pub struct Frame {
15 pub id: scru128::Scru128Id,
16 #[serde(skip_serializing_if = "Option::is_none")]
17 pub topic: Option<String>,
18 #[serde(skip_serializing_if = "Option::is_none")]
19 pub attribute: Option<String>,
20 pub data: String,
21}
22
23#[derive(Debug, Serialize, Deserialize)]
24pub struct ResponseFrame {
25 pub source_id: scru128::Scru128Id,
26 pub data: String,
27}
28
29pub fn store_open(path: &std::path::Path) -> Result<lmdb::Environment, Box<dyn std::error::Error>> {
30 std::fs::create_dir_all(path)?;
31 let env = lmdb::Environment::new()
32 .set_map_size(10 * 10485760)
33 .open(path)?;
34 Ok(env)
35}
36
37pub fn store_put(
38 env: &lmdb::Environment,
39 topic: Option<String>,
40 attribute: Option<String>,
41 data: String,
42) -> Result<scru128::Scru128Id, Box<dyn std::error::Error>> {
43 let id = scru128::new();
44
45 let frame = Frame {
46 id,
47 topic,
48 attribute,
49 data: data.trim().to_string(),
50 };
51 let frame = serde_json::to_vec(&frame)?;
52
53 let db = env.open_db(None)?;
54 let mut txn = env.begin_rw_txn()?;
55 txn.put(
56 db,
57 &id.to_u128().to_be_bytes(),
58 &frame,
59 lmdb::WriteFlags::empty(),
60 )?;
61 txn.commit()?;
62
63 Ok(id)
64}
65
66pub fn store_get(
67 env: &lmdb::Environment,
68 id: scru128::Scru128Id,
69) -> Result<Option<Frame>, Box<dyn std::error::Error>> {
70 let db = env.open_db(None)?;
71 let txn = env.begin_ro_txn()?;
72 match txn.get(db, &id.to_u128().to_be_bytes()) {
73 Ok(value) => Ok(Some(serde_json::from_slice(value)?)),
74 Err(lmdb::Error::NotFound) => Ok(None),
75 Err(err) => Err(Box::new(err)),
76 }
77}
78
79pub fn store_cat(
80 env: &lmdb::Environment,
81 last_id: Option<scru128::Scru128Id>,
82) -> Result<Vec<Frame>, Box<dyn std::error::Error>> {
83 let db = env.open_db(None)?;
84 let txn = env.begin_ro_txn()?;
85 let mut c = txn.open_ro_cursor(db)?;
86 let it = match last_id {
87 Some(key) => {
88 let mut i = c.iter_from(key.to_u128().to_be_bytes());
89 i.next();
90 i
91 }
92 None => c.iter_start(),
93 };
94 it.map(|item| -> Result<Frame, Box<dyn std::error::Error>> {
95 let (_, value) = item?;
96 Ok(serde_json::from_slice(value)?)
97 })
98 .collect::<Result<Vec<_>, _>>()
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use indoc::indoc;
105 use std::io::BufRead;
106 use std::io::Read;
107 use temp_dir::TempDir;
108 #[test]
111 fn test_store() {
112 let d = TempDir::new().unwrap();
113 let env = store_open(&d.path()).unwrap();
114
115 let id = store_put(&env, None, None, "foo".into()).unwrap();
116 assert_eq!(store_cat(&env, None).unwrap().len(), 1);
117
118 let frame = store_get(&env, id).unwrap().unwrap();
119 assert_eq!(
120 frame,
121 Frame {
122 id: id,
123 topic: None,
124 attribute: None,
125 data: "foo".into()
126 }
127 );
128
129 assert_eq!(store_cat(&env, Some(id)).unwrap().len(), 0);
131 }
132
133 use std::io::BufReader;
134 fn parse_sse<R: Read>(buf: &mut BufReader<R>) -> Option<Event> {
135 let mut line = String::new();
136
137 let mut data = Vec::<String>::new();
138 let mut id: Option<i64> = None;
139
140 loop {
141 line.clear();
142 let n = buf.read_line(&mut line).unwrap();
143 if n == 0 {
144 return None;
146 }
147
148 if line == "\n" {
149 break;
151 }
152
153 let (field, rest) = line.split_at(line.find(":").unwrap() + 1);
154 let rest = rest.trim();
155 match field {
156 ":" => (),
158 "id:" => id = Some(rest.parse::<i64>().unwrap()),
159 "data:" => data.push(rest.to_string()),
160 _ => todo!(),
161 };
162 }
163
164 return Some(Event {
165 data: data.join(" "),
166 event: None,
167 id: id,
168 });
169 }
170
171 #[test]
172 fn test_parse_sse() {
173 let mut buf = BufReader::new(
174 indoc! {"
175 : welcome
176 id: 1
177 data: foo
178 data: bar
179
180 id: 2
181 data: hai
182
183 "}
184 .as_bytes(),
185 );
186
187 let event = parse_sse(&mut buf).unwrap();
188 assert_eq!(
189 event,
190 Event {
191 data: "foo bar".into(),
192 event: None,
193 id: Some(1),
194 }
195 );
196
197 let event = parse_sse(&mut buf).unwrap();
198 assert_eq!(
199 event,
200 Event {
201 data: "hai".into(),
202 event: None,
203 id: Some(2),
204 }
205 );
206 }
207}