1use redis::{streams::StreamId, RedisError};
2use serde::{de::Deserialize, ser::Serialize};
3use std::collections::BTreeMap;
4
5extern crate self as redis_json;
6
7pub mod error;
8pub mod util;
9
10use error::Error;
11pub trait StreamEntry: Send + Sync + Serialize + for<'de> Deserialize<'de> {
13 fn from_stream_id(stream_id: &StreamId) -> Result<Self, Error> {
15 let data = stream_id
16 .map
17 .iter()
18 .map(|(key, value)| match value {
19 redis::Value::Data(ref bytes) => std::str::from_utf8(&bytes[..])
20 .map(|value| serde_json::Value::String(String::from(value)))
21 .map_err(|err| err.into())
22 .map(|value| (key.to_owned(), value)),
23 _ => Err(
24 RedisError::from((
25 redis::ErrorKind::TypeError,
26 "invalid response type for JSON",
27 ))
28 .into(),
29 ),
30 })
31 .collect::<Result<Vec<(String, serde_json::Value)>, Error>>()?;
32
33 let data = serde_json::map::Map::from_iter(data.into_iter());
34
35 let data = serde_json::from_value(serde_json::Value::Object(data))?;
36
37 Ok(data)
38 }
39
40 fn into_xadd_map(&self) -> Result<BTreeMap<String, String>, serde_json::Error> {
42 let value = serde_json::to_value(&self)?;
43
44 let data: Vec<(String, String)> = value
45 .as_object()
46 .into_iter()
47 .flat_map(|map| {
48 map.into_iter().filter_map(|(key, value)| match value {
49 serde_json::Value::Null => None,
50 serde_json::Value::Bool(value) => Some(Ok((key.to_owned(), value.to_string()))),
51 serde_json::Value::Number(value) => Some(Ok((key.to_owned(), value.to_string()))),
52 serde_json::Value::String(value) => Some(Ok((key.to_owned(), value.to_owned()))),
53
54 serde_json::Value::Array(value) => {
55 Some(serde_json::to_string(value).map(|value| (key.to_owned(), value)))
56 }
57
58 serde_json::Value::Object(value) => {
59 Some(serde_json::to_string(value).map(|value| (key.to_owned(), value)))
60 }
61 })
62 })
63 .collect::<Result<Vec<(String, String)>, serde_json::Error>>()?;
64
65 let data: BTreeMap<String, String> = BTreeMap::from_iter(data.into_iter());
66
67 Ok(data)
68 }
69}