xs_lib/
lib.rs

1use serde::{Deserialize, Serialize};
2
3use lmdb::Cursor;
4use lmdb::Transaction;
5
6#[derive(Debug, PartialEq)]
7struct Event {
8    data: String,
9    event: Option<String>,
10    id: Option<i64>,
11}
12
13#[derive(PartialEq, Debug, Serialize, Deserialize)]
14pub struct Frame {
15    pub id: scru128::Scru128Id,
16    #[serde(skip_serializing_if = "Option::is_none")]
17    pub topic: Option<String>,
18    #[serde(skip_serializing_if = "Option::is_none")]
19    pub attribute: Option<String>,
20    pub data: String,
21}
22
23#[derive(Debug, Serialize, Deserialize)]
24pub struct ResponseFrame {
25    pub source_id: scru128::Scru128Id,
26    pub data: String,
27}
28
29pub fn store_open(path: &std::path::Path) -> Result<lmdb::Environment, Box<dyn std::error::Error>> {
30    std::fs::create_dir_all(path)?;
31    let env = lmdb::Environment::new()
32        .set_map_size(10 * 10485760)
33        .open(path)?;
34    Ok(env)
35}
36
37pub fn store_put(
38    env: &lmdb::Environment,
39    topic: Option<String>,
40    attribute: Option<String>,
41    data: String,
42) -> Result<scru128::Scru128Id, Box<dyn std::error::Error>> {
43    let id = scru128::new();
44
45    let frame = Frame {
46        id,
47        topic,
48        attribute,
49        data: data.trim().to_string(),
50    };
51    let frame = serde_json::to_vec(&frame)?;
52
53    let db = env.open_db(None)?;
54    let mut txn = env.begin_rw_txn()?;
55    txn.put(
56        db,
57        &id.to_u128().to_be_bytes(),
58        &frame,
59        lmdb::WriteFlags::empty(),
60    )?;
61    txn.commit()?;
62
63    Ok(id)
64}
65
66pub fn store_get(
67    env: &lmdb::Environment,
68    id: scru128::Scru128Id,
69) -> Result<Option<Frame>, Box<dyn std::error::Error>> {
70    let db = env.open_db(None)?;
71    let txn = env.begin_ro_txn()?;
72    match txn.get(db, &id.to_u128().to_be_bytes()) {
73        Ok(value) => Ok(Some(serde_json::from_slice(value)?)),
74        Err(lmdb::Error::NotFound) => Ok(None),
75        Err(err) => Err(Box::new(err)),
76    }
77}
78
79pub fn store_cat(
80    env: &lmdb::Environment,
81    last_id: Option<scru128::Scru128Id>,
82) -> Result<Vec<Frame>, Box<dyn std::error::Error>> {
83    let db = env.open_db(None)?;
84    let txn = env.begin_ro_txn()?;
85    let mut c = txn.open_ro_cursor(db)?;
86    let it = match last_id {
87        Some(key) => {
88            let mut i = c.iter_from(key.to_u128().to_be_bytes());
89            i.next();
90            i
91        }
92        None => c.iter_start(),
93    };
94    it.map(|item| -> Result<Frame, Box<dyn std::error::Error>> {
95        let (_, value) = item?;
96        Ok(serde_json::from_slice(value)?)
97    })
98    .collect::<Result<Vec<_>, _>>()
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use indoc::indoc;
105    use std::io::BufRead;
106    use std::io::Read;
107    use temp_dir::TempDir;
108    // use pretty_assertions::assert_eq;
109
110    #[test]
111    fn test_store() {
112        let d = TempDir::new().unwrap();
113        let env = store_open(&d.path()).unwrap();
114
115        let id = store_put(&env, None, None, "foo".into()).unwrap();
116        assert_eq!(store_cat(&env, None).unwrap().len(), 1);
117
118        let frame = store_get(&env, id).unwrap().unwrap();
119        assert_eq!(
120            frame,
121            Frame {
122                id: id,
123                topic: None,
124                attribute: None,
125                data: "foo".into()
126            }
127        );
128
129        // skip with last_id
130        assert_eq!(store_cat(&env, Some(id)).unwrap().len(), 0);
131    }
132
133    use std::io::BufReader;
134    fn parse_sse<R: Read>(buf: &mut BufReader<R>) -> Option<Event> {
135        let mut line = String::new();
136
137        let mut data = Vec::<String>::new();
138        let mut id: Option<i64> = None;
139
140        loop {
141            line.clear();
142            let n = buf.read_line(&mut line).unwrap();
143            if n == 0 {
144                // stream interrupted
145                return None;
146            }
147
148            if line == "\n" {
149                // end of event, emit
150                break;
151            }
152
153            let (field, rest) = line.split_at(line.find(":").unwrap() + 1);
154            let rest = rest.trim();
155            match field {
156                // comment
157                ":" => (),
158                "id:" => id = Some(rest.parse::<i64>().unwrap()),
159                "data:" => data.push(rest.to_string()),
160                _ => todo!(),
161            };
162        }
163
164        return Some(Event {
165            data: data.join(" "),
166            event: None,
167            id: id,
168        });
169    }
170
171    #[test]
172    fn test_parse_sse() {
173        let mut buf = BufReader::new(
174            indoc! {"
175        : welcome
176        id: 1
177        data: foo
178        data: bar
179
180        id: 2
181        data: hai
182
183        "}
184            .as_bytes(),
185        );
186
187        let event = parse_sse(&mut buf).unwrap();
188        assert_eq!(
189            event,
190            Event {
191                data: "foo bar".into(),
192                event: None,
193                id: Some(1),
194            }
195        );
196
197        let event = parse_sse(&mut buf).unwrap();
198        assert_eq!(
199            event,
200            Event {
201                data: "hai".into(),
202                event: None,
203                id: Some(2),
204            }
205        );
206    }
207}