use serde::{Deserialize, Serialize};
use lmdb::Cursor;
use lmdb::Transaction;
#[derive(Debug, PartialEq)]
struct Event {
data: String,
event: Option<String>,
id: Option<i64>,
}
#[derive(PartialEq, Debug, Serialize, Deserialize)]
pub struct Frame {
pub id: scru128::Scru128Id,
#[serde(skip_serializing_if = "Option::is_none")]
pub topic: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub attribute: Option<String>,
pub data: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ResponseFrame {
pub source_id: scru128::Scru128Id,
pub data: String,
}
pub fn store_open(path: &std::path::Path) -> Result<lmdb::Environment, Box<dyn std::error::Error>> {
std::fs::create_dir_all(path)?;
let env = lmdb::Environment::new()
.set_map_size(10 * 10485760)
.open(path)?;
Ok(env)
}
pub fn store_put(
env: &lmdb::Environment,
topic: Option<String>,
attribute: Option<String>,
data: String,
) -> Result<scru128::Scru128Id, Box<dyn std::error::Error>> {
let id = scru128::new();
let frame = Frame {
id,
topic,
attribute,
data: data.trim().to_string(),
};
let frame = serde_json::to_vec(&frame)?;
let db = env.open_db(None)?;
let mut txn = env.begin_rw_txn()?;
txn.put(
db,
&id.to_u128().to_be_bytes(),
&frame,
lmdb::WriteFlags::empty(),
)?;
txn.commit()?;
Ok(id)
}
pub fn store_get(
env: &lmdb::Environment,
id: scru128::Scru128Id,
) -> Result<Option<Frame>, Box<dyn std::error::Error>> {
let db = env.open_db(None)?;
let txn = env.begin_ro_txn()?;
match txn.get(db, &id.to_u128().to_be_bytes()) {
Ok(value) => Ok(Some(serde_json::from_slice(value)?)),
Err(lmdb::Error::NotFound) => Ok(None),
Err(err) => Err(Box::new(err)),
}
}
pub fn store_cat(
env: &lmdb::Environment,
last_id: Option<scru128::Scru128Id>,
) -> Result<Vec<Frame>, Box<dyn std::error::Error>> {
let db = env.open_db(None)?;
let txn = env.begin_ro_txn()?;
let mut c = txn.open_ro_cursor(db)?;
let it = match last_id {
Some(key) => {
let mut i = c.iter_from(key.to_u128().to_be_bytes());
i.next();
i
}
None => c.iter_start(),
};
it.map(|item| -> Result<Frame, Box<dyn std::error::Error>> {
let (_, value) = item?;
Ok(serde_json::from_slice(value)?)
})
.collect::<Result<Vec<_>, _>>()
}
#[cfg(test)]
mod tests {
use super::*;
use indoc::indoc;
use std::io::BufRead;
use std::io::Read;
use temp_dir::TempDir;
#[test]
fn test_store() {
let d = TempDir::new().unwrap();
let env = store_open(&d.path()).unwrap();
let id = store_put(&env, None, None, "foo".into()).unwrap();
assert_eq!(store_cat(&env, None).unwrap().len(), 1);
let frame = store_get(&env, id).unwrap().unwrap();
assert_eq!(
frame,
Frame {
id: id,
topic: None,
attribute: None,
data: "foo".into()
}
);
assert_eq!(store_cat(&env, Some(id)).unwrap().len(), 0);
}
use std::io::BufReader;
fn parse_sse<R: Read>(buf: &mut BufReader<R>) -> Option<Event> {
let mut line = String::new();
let mut data = Vec::<String>::new();
let mut id: Option<i64> = None;
loop {
line.clear();
let n = buf.read_line(&mut line).unwrap();
if n == 0 {
return None;
}
if line == "\n" {
break;
}
let (field, rest) = line.split_at(line.find(":").unwrap() + 1);
let rest = rest.trim();
match field {
":" => (),
"id:" => id = Some(rest.parse::<i64>().unwrap()),
"data:" => data.push(rest.to_string()),
_ => todo!(),
};
}
return Some(Event {
data: data.join(" "),
event: None,
id: id,
});
}
#[test]
fn test_parse_sse() {
let mut buf = BufReader::new(
indoc! {"
: welcome
id: 1
data: foo
data: bar
id: 2
data: hai
"}
.as_bytes(),
);
let event = parse_sse(&mut buf).unwrap();
assert_eq!(
event,
Event {
data: "foo bar".into(),
event: None,
id: Some(1),
}
);
let event = parse_sse(&mut buf).unwrap();
assert_eq!(
event,
Event {
data: "hai".into(),
event: None,
id: Some(2),
}
);
}
}