use futures::{AsyncWrite, AsyncWriteExt, Stream};
use futures::io::{AsyncBufRead, AsyncBufReadExt, Lines};
use shvproto::RpcValue;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt::Display;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::error::Error;
use crate::datachange::ValueFlags;
use crate::metamethod::AccessLevel;
use crate::journalentry::JournalEntry;
const JOURNAL_ENTRIES_SEPARATOR: &str = "\t";
const METH_GET: &str = "get";
const SIG_CHNG: &str = "chng";
fn parse_journal_entry_log2(line: &str) -> Result<JournalEntry, Box<dyn Error + Send + Sync>> {
let parts: Vec<&str> = line.split(JOURNAL_ENTRIES_SEPARATOR).collect();
let mut parts_iter = parts.iter().copied();
let epoch_msec = shvproto::datetime::DateTime::from_iso_str(parts_iter
.next()
.ok_or_else(|| format!("Missing timestamp on line: {line}"))?)
.map_err(|e| format!("Cannot parse timestamp on line: {line}, error: {e}"))?
.epoch_msec();
let _up_time = parts_iter.next();
let path = parts_iter.next().ok_or_else(|| format!("Missing path on line: {line}"))?.to_string();
let value = parts_iter.next().ok_or_else(|| format!("Missing value on line: {line}"))?;
let value = RpcValue::from_cpon(value).map_err(|err| format!("Cannot parse a CPON value: `{value}` on line: {line}, error: {err}"))?;
let short_time = parts_iter.next().unwrap_or_default().parse().unwrap_or(-1);
let domain = parts_iter.next();
let value_flags = ValueFlags::from_bits_retain(parts_iter.next().unwrap_or_default().parse().unwrap_or(0));
let user_id = parts_iter
.next()
.map_or_else(|| Ok(RpcValue::null()), |user_id| if user_id.is_empty() {
Ok(RpcValue::null())
} else {
RpcValue::from_cpon(user_id)
.map_err(|err| format!("Cannot parse user_id from CPON: `{user_id}` on line: {line}, error: {err}"))
})?;
Ok(JournalEntry {
epoch_msec,
path,
signal: domain.unwrap_or(SIG_CHNG).into(),
source: METH_GET.into(),
value,
access_level: AccessLevel::Read as _,
short_time,
user_id,
repeat: !value_flags.contains(ValueFlags::SPONTANEOUS),
provisional: value_flags.contains(ValueFlags::PROVISIONAL),
})
}
pub struct JournalReaderLog2<R> {
lines: Lines<R>,
}
impl<R> JournalReaderLog2<R>
where
R: AsyncBufRead + Unpin,
{
pub fn new(reader: R) -> Self {
JournalReaderLog2 {
lines: reader.lines(),
}
}
}
impl<R> Stream for JournalReaderLog2<R>
where
R: AsyncBufRead + Unpin,
{
type Item = Result<JournalEntry, Box<dyn Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let lines = Pin::new(&mut self.lines);
match lines.poll_next(cx) {
Poll::Ready(Some(Ok(line))) => Poll::Ready(Some(parse_journal_entry_log2(&line))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct JournalWriterLog2<W> {
writer: W,
}
impl<W> JournalWriterLog2<W>
where
W: AsyncWrite + Unpin,
{
pub fn new(writer: W) -> Self {
Self {
writer,
}
}
pub async fn append(&mut self, entry: &JournalEntry) -> Result<(), std::io::Error> {
let mut msec = entry.epoch_msec;
if msec == 0 {
msec = shvproto::DateTime::now().epoch_msec();
}
self.append_with_time(msec, msec, entry).await
}
pub async fn append_with_time(&mut self, msec: i64, orig_time: i64, entry: &JournalEntry) -> Result<(), std::io::Error> {
let line = [
shvproto::DateTime::from_epoch_msec(msec).to_iso_string(),
if orig_time == msec { "".into() } else { shvproto::DateTime::from_epoch_msec(orig_time).to_iso_string() },
entry.path.clone(),
entry.value.to_cpon(),
if entry.short_time >= 0 { entry.short_time.to_string() } else { "".into() },
entry.signal.clone(),
{
let mut value_flags = ValueFlags::empty();
if !entry.repeat {
value_flags.insert(ValueFlags::SPONTANEOUS);
}
if entry.provisional {
value_flags.insert(ValueFlags::PROVISIONAL);
}
value_flags.bits().to_string()
},
if entry.user_id.is_null() { "".into() } else { entry.user_id.to_cpon() },
].join(JOURNAL_ENTRIES_SEPARATOR) + "\n";
self.writer.write_all(line.as_bytes()).await?;
self.writer.flush().await
}
}
pub fn datetime_to_log3_filename(dt: shvproto::DateTime) -> String {
dt
.to_chrono_datetime()
.format("%Y-%m-%dT%H-%M-%S.log3")
.to_string()
}
#[repr(i32)]
pub enum Log3Key {
Time = 1,
Path,
Signal,
Source,
Value,
AccessLevel,
UserId,
Repeat,
Provisional,
}
pub fn journal_entry_to_log3_imap(entry: JournalEntry) -> shvproto::IMap {
let time = match entry.epoch_msec {
..=0 => None,
val => Some(shvproto::DateTime::from_epoch_msec(val)),
};
let mut imap = shvproto::IMap::new();
imap.insert(Log3Key::Time as _, time.into());
if !entry.path.is_empty() {
imap.insert(Log3Key::Path as _, entry.path.into());
}
if entry.signal != SIG_CHNG {
imap.insert(Log3Key::Signal as _, entry.signal.into());
}
if entry.source != METH_GET {
imap.insert(Log3Key::Source as _, entry.source.into());
}
if !entry.value.is_null() {
imap.insert(Log3Key::Value as _, entry.value);
}
if entry.access_level != AccessLevel::Read as _ {
imap.insert(Log3Key::AccessLevel as _, entry.access_level.into());
}
if !entry.user_id.is_null() {
imap.insert(Log3Key::UserId as _, entry.user_id);
}
if entry.repeat {
imap.insert(Log3Key::Repeat as _, entry.repeat.into());
}
if entry.provisional {
imap.insert(Log3Key::Provisional as _, entry.provisional.into());
}
imap
}
pub struct JournalWriterLog3<W> {
writer: W,
}
impl<W> JournalWriterLog3<W>
where
W: AsyncWrite + Unpin,
{
pub async fn new(mut writer: W) -> std::io::Result<Self> {
let header = RpcValue::from(shvproto::make_map!("version" => shvproto::Decimal::new(3, 0))).to_cpon() + "\n";
Self::write_bytes(&mut writer, header.as_bytes()).await?;
Ok(Self {
writer,
})
}
pub async fn append(&mut self, entry: JournalEntry) -> std::io::Result<()> {
let line = RpcValue::from(journal_entry_to_log3_imap(entry)).to_cpon() + "\n";
Self::write_bytes(&mut self.writer, line.as_bytes()).await
}
async fn write_bytes(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
writer.write_all(bytes).await?;
writer.flush().await
}
}
pub struct JournalReaderLog3<R> {
lines: Lines<R>,
got_header: bool,
}
impl<R> JournalReaderLog3<R>
where
R: AsyncBufRead + Unpin,
{
pub fn new(reader: R) -> Self {
JournalReaderLog3 {
lines: reader.lines(),
got_header: false,
}
}
}
fn parse_journal_entry_log3(line: impl AsRef<str>) -> Result<JournalEntry, Box<dyn Error + Send + Sync>> {
let line = line.as_ref();
let entry: shvproto::IMap = RpcValue::from_cpon(line)
.map_err(|err| err.to_string())
.and_then(TryInto::try_into)
.map_err(|err| format!("Cannot parse a journal entry from line: `{line}`: {err}"))?;
let epoch_msec = entry
.get(&(Log3Key::Time as _))
.map_or(0, |v| v.as_datetime().epoch_msec());
let path = entry
.get(&(Log3Key::Path as _))
.map_or_else(String::new, |v| v.as_str().to_string());
let signal = entry
.get(&(Log3Key::Signal as _))
.map_or_else(|| SIG_CHNG.into(), |v| v.as_str().into());
let source = entry
.get(&(Log3Key::Source as _))
.map_or_else(|| METH_GET.into(), |v| v.as_str().into());
let value = entry
.get(&(Log3Key::Value as _))
.map_or_else(RpcValue::null, RpcValue::clone);
let access_level = entry
.get(&(Log3Key::AccessLevel as _))
.map_or(AccessLevel::Read as _, RpcValue::as_i32);
let user_id = entry
.get(&(Log3Key::UserId as _))
.map_or_else(RpcValue::null, RpcValue::clone);
let repeat = entry
.get(&(Log3Key::Repeat as _))
.is_some_and(RpcValue::as_bool);
let provisional = entry
.get(&(Log3Key::Provisional as _))
.is_some_and(RpcValue::as_bool);
Ok(JournalEntry {
epoch_msec,
path,
signal,
source,
value,
access_level,
short_time: -1,
user_id,
repeat,
provisional,
})
}
impl<R> Stream for JournalReaderLog3<R>
where
R: AsyncBufRead + Unpin,
{
type Item = Result<JournalEntry, Box<dyn Error + Send + Sync>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let lines = Pin::new(&mut self.lines);
match lines.poll_next(cx) {
Poll::Ready(Some(Ok(line))) if !self.got_header => {
let header_validation_res = RpcValue::from_cpon(&line)
.map_err(|err| err.to_string())
.and_then(TryInto::try_into)
.and_then(|header: shvproto::Map| header
.get("version")
.cloned()
.ok_or_else(|| "Missing version".into())
)
.and_then(|version|
if version == shvproto::Decimal::new(3, 0).into() {
Ok(())
} else {
Err(format!("Expected log version 3.0, have {version}"))
}
)
.map_err(|err| format!("Wrong journal file header: `{line}`: {err}"));
if let Err(err) = header_validation_res {
return Poll::Ready(Some(Err(err.into())))
}
self.got_header = true;
self.poll_next(cx)
}
Poll::Ready(Some(Ok(line))) => Poll::Ready(Some(parse_journal_entry_log3(&line))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct Log2Reader {
log: std::vec::IntoIter<RpcValue>,
header: Log2Header,
}
impl Log2Reader {
pub fn new(log: RpcValue) -> Result<Self, Box<dyn Error>> {
let shvproto::Value::List(list) = log.value else {
return Err("Wrong log format - not a list".into());
};
Ok(Self {
log: list.into_iter(),
header: (*log.meta.unwrap_or_default()).try_into()?,
})
}
pub fn header(&self) -> &Log2Header {
&self.header
}
}
impl Iterator for Log2Reader {
type Item = Result<JournalEntry, Box<dyn Error>>;
fn next(&mut self) -> Option<Self::Item> {
self.log.next().map(|entry| rpcvalue_to_journal_entry(&entry, &self.header.paths_dict))
}
}
const NO_SHORT_TIME: i32 = -1;
fn rpcvalue_to_journal_entry(entry: &RpcValue, paths_dict: &BTreeMap<i32, String>) -> Result<JournalEntry, Box<dyn Error>> {
let make_err = |msg| Err(format!("{msg}: {}", entry.to_cpon()).into());
let shvproto::Value::List(row) = &entry.value else {
return make_err("Log entry is not a list");
};
let mut row = row.as_slice().iter();
let timestamp = row.next().unwrap_or_default();
let timestamp = match ×tamp.value {
shvproto::Value::DateTime(date_time) => *date_time,
_ => return make_err(&format!("Wrong `timestamp` `{}` of journal entry", timestamp.to_cpon())),
};
let path = row.next().unwrap_or_default();
let path = match &path.value {
#[expect(clippy::cast_possible_truncation, reason = "We don't care")]
shvproto::Value::Int(idx) => paths_dict
.get(&(*idx as i32))
.ok_or_else(|| format!("Wrong path reference {idx} of journal entry: {}", entry.to_cpon()))?,
shvproto::Value::String(path) => path,
_ => return make_err(&format!("Wrong path `{}` of journal entry", path.to_cpon())),
}.clone();
let value = row.next().unwrap_or_default();
let short_time = row.next().unwrap_or_default();
let short_time = match short_time.value {
#[expect(clippy::cast_possible_truncation, reason = "We don't care")]
shvproto::Value::Int(val) if val as i32 >= 0 => val as _,
_ => NO_SHORT_TIME,
};
let domain = row.next().unwrap_or_default();
let signal = match &domain.value {
shvproto::Value::String(domain) if domain.is_empty() || domain.as_str() == "C" => SIG_CHNG,
shvproto::Value::String(domain) => domain.as_str(),
_ => SIG_CHNG,
}.to_string();
let value_flags = row.next();
let value_flags = match value_flags {
Some(value_flags) => match &value_flags.value {
shvproto::Value::UInt(val) => *val,
shvproto::Value::Int(val) => val.cast_unsigned(),
_ => return make_err(&format!("Wrong `valueFlags` {} of journal entry", value_flags.to_cpon())),
},
None => 0,
};
let value_flags = ValueFlags::from_bits_retain(value_flags);
let user_id = row.next().map_or_else(RpcValue::null, RpcValue::clone);
Ok(JournalEntry {
epoch_msec: timestamp.epoch_msec(),
path,
signal,
source: METH_GET.into(),
value: value.clone(),
access_level: AccessLevel::Read as _,
short_time,
user_id,
repeat: !value_flags.contains(ValueFlags::SPONTANEOUS),
provisional: value_flags.contains(ValueFlags::PROVISIONAL),
})
}
pub(crate) fn journal_entry_to_rpclist(
entry: &JournalEntry,
path_cache: Option<&mut BTreeMap<String, i32>>,
) -> shvproto::List {
let path_value: RpcValue = path_cache.map_or_else(|| entry.path.clone().into(), |cache| {
if let Some(&idx) = cache.get(&entry.path) {
return idx.into();
}
#[expect(clippy::cast_possible_wrap, clippy::cast_possible_truncation, reason ="we hope we don't have too many paths")]
let new_idx = cache.len() as i32;
cache.insert(entry.path.clone(), new_idx);
new_idx.into()
});
let mut value_flags = ValueFlags::empty();
if !entry.repeat {
value_flags.insert(ValueFlags::SPONTANEOUS);
}
if entry.provisional {
value_flags.insert(ValueFlags::PROVISIONAL);
}
let domain = if entry.signal == SIG_CHNG {
"".into()
} else {
entry.signal.clone()
};
shvproto::make_list!(
shvproto::DateTime::from_epoch_msec(entry.epoch_msec),
path_value,
entry.value.clone(),
entry.short_time,
domain,
value_flags.bits(),
entry.user_id.clone(),
)
}
pub fn journal_entries_to_rpcvalue<'a>(
entries: impl IntoIterator<Item = &'a JournalEntry>,
with_paths_dict: bool
) -> (RpcValue, BTreeMap<i32, String>)
{
let mut path_cache = with_paths_dict.then(BTreeMap::new);
let result: RpcValue = entries
.into_iter()
.map(|entry| journal_entry_to_rpclist(entry, path_cache.as_mut()))
.collect::<Vec<_>>()
.into();
let paths_dict = path_cache
.map_or_else(Default::default, |cache| cache
.into_iter()
.map(|(k,v)| (v, k))
.collect()
);
(result, paths_dict)
}
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub enum GetLog2Since {
DateTime(shvproto::DateTime),
LastEntry,
None,
}
impl Display for GetLog2Since {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let str = match self {
GetLog2Since::DateTime(date_time) => date_time.to_iso_string(),
GetLog2Since::LastEntry => "last".into(),
GetLog2Since::None => "none".into(),
};
write!(f, "{str}")
}
}
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct GetLog2Params {
pub since: GetLog2Since,
pub until: Option<shvproto::DateTime>,
pub path_pattern: Option<String>,
pub with_paths_dict: bool,
pub with_snapshot: bool,
pub record_count_limit: i64,
}
impl Display for GetLog2Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let until = self.until
.map_or_else(
|| Cow::Borrowed("<none>"),
|dt| Cow::Owned(dt.to_iso_string())
);
let path_pattern = self.path_pattern
.as_ref()
.map_or("<none>", String::as_str);
write!(f, "since: {since}, until: {until}, path_pattern: {path_pattern}, with_paths_dict: {with_paths_dict}, with_snapshot: {with_snapshot}, record_count_limit: {record_count_limit}",
since = self.since,
with_paths_dict = self.with_paths_dict,
with_snapshot = self.with_snapshot,
record_count_limit = self.record_count_limit,
)
}
}
pub(crate) const RECORD_COUNT_LIMIT_DEFAULT: i64 = 10000;
impl Default for GetLog2Params {
fn default() -> Self {
Self {
since: GetLog2Since::None,
until: None,
path_pattern: None,
with_paths_dict: true,
with_snapshot: false,
record_count_limit: RECORD_COUNT_LIMIT_DEFAULT,
}
}
}
impl From<GetLog2Params> for RpcValue {
fn from(value: GetLog2Params) -> Self {
let mut map = shvproto::Map::new();
match value.since {
GetLog2Since::DateTime(dt) => {
map.insert("since".into(), dt.into());
}
GetLog2Since::LastEntry => {
map.insert("since".into(), "last".into());
}
GetLog2Since::None => { }
}
if let Some(until) = value.until {
map.insert("until".into(), until.into());
}
if let Some(path_pattern) = value.path_pattern {
map.insert("pathPattern".into(), path_pattern.into());
}
map.insert("withPathsDict".into(), value.with_paths_dict.into());
map.insert("withSnapshot".into(), value.with_snapshot.into());
map.insert("recordCountLimit".into(), value.record_count_limit.into());
map.into()
}
}
impl TryFrom<&RpcValue> for GetLog2Params {
type Error = String;
fn try_from(value: &RpcValue) -> Result<Self, Self::Error> {
let shvproto::Value::Map(map) = &value.value else {
return Err(format!("getLog params has wrong type, expected Map, got {}", value.type_name()));
};
let since = match map.get("since") {
Some(since) => match &since.value {
shvproto::Value::Null => GetLog2Since::None,
shvproto::Value::DateTime(dt) => GetLog2Since::DateTime(*dt),
shvproto::Value::String(val) if val.as_str() == "last" => GetLog2Since::LastEntry,
_ => return Err("Invalid `since` value, expected a DateTime or \"last\"".into()),
}
None => GetLog2Since::None,
};
let until = match map.get("until") {
Some(until) => Some(until.to_datetime().ok_or_else(|| "Invalid `until` value type".to_string())?),
None => None,
};
let path_pattern = match map.get("pathPattern").map(|v| &v.value) {
Some(shvproto::Value::String(path_pattern)) => Some(path_pattern.to_string()),
Some(_) => return Err("Invalid `pathPattern` type".into()),
None => None,
};
let with_paths_dict = match map.get("withPathsDict").map(|v| &v.value) {
Some(shvproto::Value::Bool(val)) => *val,
Some(_) => return Err("Invalid `withPathsDict` type".into()),
None => true,
};
let with_snapshot = match map.get("withSnapshot").map(|v| &v.value) {
Some(shvproto::Value::Bool(val)) => *val,
Some(_) => return Err("Invalid `withSnapshot` type".into()),
None => false,
};
let record_count_limit = match map.get("recordCountLimit").map(|v| &v.value) {
Some(shvproto::Value::Int(val)) => *val,
Some(_) => return Err("Invalid `recordCountLimit` type".into()),
None => RECORD_COUNT_LIMIT_DEFAULT,
};
Ok(Self { since, until, path_pattern, with_paths_dict, with_snapshot, record_count_limit })
}
}
pub fn matches_path_pattern(path: impl AsRef<str>, pattern: impl AsRef<str>) -> bool {
let path_parts: Vec<&str> = path.as_ref().split('/').collect();
let pattern_parts: Vec<&str> = pattern.as_ref().split('/').collect();
let (mut path_ix, mut pattern_ix) = (0, 0);
let (mut last_starstar_pattern_ix, mut last_starstar_path_ix) = (None, 0);
while path_ix < path_parts.len() {
match pattern_parts.get(pattern_ix).copied() {
Some("**") => {
last_starstar_pattern_ix = Some(pattern_ix);
last_starstar_path_ix = path_ix;
pattern_ix += 1;
}
Some("*") => {
path_ix += 1;
pattern_ix += 1;
}
Some(literal) if literal == *path_parts.get(path_ix).expect("The bound is checked above") => {
path_ix += 1;
pattern_ix += 1;
}
_ => {
if let Some(starstart_patt_ix) = last_starstar_pattern_ix {
last_starstar_path_ix += 1;
path_ix = last_starstar_path_ix;
pattern_ix = starstart_patt_ix + 1;
} else {
return false;
}
}
}
}
while pattern_parts.get(pattern_ix).copied() == Some("**") {
pattern_ix += 1;
}
pattern_ix == pattern_parts.len()
}
#[derive(Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct Log2Header {
pub record_count: i64,
pub record_count_limit: i64,
pub record_count_limit_hit: bool,
pub date_time: shvproto::DateTime,
pub since: shvproto::DateTime,
pub until: shvproto::DateTime,
pub with_paths_dict: bool,
pub with_snapshot: bool,
pub paths_dict: BTreeMap<i32, String>,
pub log_params: GetLog2Params,
pub log_version: i64,
}
impl Display for Log2Header {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Log2Header {{\n\
\trecord_count: {record_count},\n\
\trecord_count_limit: {record_count_limit},\n\
\trecord_count_limit_hit: {record_count_limit_hit},\n\
\tdate_time: {date_time},\n\
\tsince: {since},\n\
\tuntil: {until},\n\
\twith_paths_dict: {with_paths_dict},\n\
\twith_snapshot: {with_snapshot},\n\
\tlog_params: {log_params},\n\
\tlog_version: {log_version}\n\
}}",
record_count = self.record_count,
record_count_limit = self.record_count_limit,
record_count_limit_hit = self.record_count_limit_hit,
date_time = self.date_time.to_iso_string(),
since = self.since.to_iso_string(),
until = self.until.to_iso_string(),
with_paths_dict = self.with_paths_dict,
with_snapshot = self.with_snapshot,
log_params = self.log_params,
log_version = self.log_version,
)
}
}
impl TryFrom<shvproto::MetaMap> for Log2Header {
type Error = String;
fn try_from(meta: shvproto::MetaMap) -> Result<Self, Self::Error> {
let current_datetime = shvproto::DateTime::now();
let record_count = match meta.get("recordCount").map(|v| &v.value) {
Some(shvproto::Value::Int(record_count)) => *record_count,
Some(v) => return Err(format!("Invalid `recordCount` type: {}", v.type_name())),
None => 0,
};
let record_count_limit = match meta.get("recordCountLimit").map(|v| &v.value) {
Some(shvproto::Value::Int(record_count_limit)) => *record_count_limit,
Some(v) => return Err(format!("Invalid `recordCountLimit` type: {}", v.type_name())),
None => 0,
};
let record_count_limit_hit = match meta.get("recordCountLimitHit").map(|v| &v.value) {
Some(shvproto::Value::Bool(record_count_limit_hit)) => *record_count_limit_hit,
Some(v) => return Err(format!("Invalid `recordCountLimitHit` type: {}", v.type_name())),
None => false,
};
let date_time = match meta.get("dateTime").map(|v| &v.value) {
Some(shvproto::Value::DateTime(date_time)) => *date_time,
Some(shvproto::Value::Null) | None => current_datetime,
Some(v) => return Err(format!("Invalid `dateTime` type: {}", v.type_name())),
};
let since = match meta.get("since").map(|v| &v.value) {
Some(shvproto::Value::DateTime(since)) => *since,
Some(shvproto::Value::Null) | None => current_datetime,
Some(v) => return Err(format!("Invalid `since` type: {}", v.type_name())),
};
let until = match meta.get("until").map(|v| &v.value) {
Some(shvproto::Value::DateTime(until)) => *until,
Some(shvproto::Value::Null) | None => current_datetime,
Some(v) => return Err(format!("Invalid `until` type: {}", v.type_name())),
};
let with_paths_dict = match meta.get("withPathsDict").map(|v| &v.value) {
Some(shvproto::Value::Bool(val)) => *val,
Some(v) => return Err(format!("Invalid `withPathsDict` type: {}", v.type_name())),
None => true,
};
let with_snapshot = match meta.get("withSnapshot").map(|v| &v.value) {
Some(shvproto::Value::Bool(val)) => *val,
Some(v) => return Err(format!("Invalid `withSnapshot` type: {}", v.type_name())),
None => false,
};
let paths_dict: BTreeMap<i32, String> = match meta.get("pathsDict").map(|v| &v.value) {
Some(shvproto::Value::IMap(val)) => val
.iter()
.map(|(i, v)| v
.try_into()
.map(|s| (*i, s)))
.collect::<Result<BTreeMap<_, _>,_>>()
.map_err(|e| format!("Corrupted paths dictionary: {e}"))?,
Some(v) => return Err(format!("Invalid `pathsDict` type: {}", v.type_name())),
None => BTreeMap::default(),
};
let log_params = match meta.get("logParams") {
Some(val) => GetLog2Params::try_from(val)?,
None => GetLog2Params::try_from(&shvproto::Map::new().into())?,
};
let log_version = match meta.get("logVersion").map(|v| &v.value) {
Some(shvproto::Value::Int(val)) => *val,
Some(v) => return Err(format!("Invalid `logVersion` type: {}", v.type_name())),
None => 2,
};
Ok(Self {
record_count,
record_count_limit,
record_count_limit_hit,
date_time,
since,
until,
with_paths_dict,
with_snapshot,
paths_dict,
log_params,
log_version,
})
}
}
impl From<Log2Header> for shvproto::MetaMap {
fn from(value: Log2Header) -> Self {
let mut meta = shvproto::MetaMap::new();
meta.insert("recordCount", value.record_count.into());
meta.insert("recordCountLimit", value.record_count_limit.into());
meta.insert("recordCountLimitHit", value.record_count_limit_hit.into());
meta.insert("dateTime", value.date_time.into());
meta.insert("since", value.since.into());
meta.insert("until", value.until.into());
meta.insert("withPathsDict", value.with_paths_dict.into());
meta.insert("withSnapshot", value.with_snapshot.into());
meta.insert("pathsDict", value.paths_dict.into());
meta.insert("logParams", value.log_params.into());
meta.insert("logVersion", value.log_version.into());
meta
}
}
#[cfg(test)]
mod tests {
use futures::io::{BufReader, Cursor};
use futures::StreamExt;
use macro_rules_attribute::apply;
use shvproto::{CponReader, Reader};
use crate::metamethod::AccessLevel;
use super::{METH_GET, SIG_CHNG};
use smol::fs::File;
use smol_macros::{test, Executor};
use crate::journalentry::JournalEntry;
use crate::journalrw::{JournalReaderLog2, JournalReaderLog3, JournalWriterLog2, JournalWriterLog3, Log2Reader, matches_path_pattern};
use super::{
journal_entries_to_rpcvalue, parse_journal_entry_log2, Log2Header,
NO_SHORT_TIME, RECORD_COUNT_LIMIT_DEFAULT,
};
#[apply(test!)]
async fn read_file_journal(ex: &Executor<'_>) {
ex.spawn(async move {
let file = File::open("tests/test.log2").await.unwrap();
let mut reader = JournalReaderLog2::new(BufReader::new(file));
while let Some(result) = reader.next().await {
println!("{:?}", result.unwrap());
}
}).await;
}
#[apply(test!)]
async fn journal_write_and_read_log2(ex: &Executor<'_>) {
ex.spawn(async move {
let entries = [
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "test/path".into(),
signal: SIG_CHNG.into(),
source: METH_GET.into(),
value: 42.into(),
access_level: AccessLevel::Read as _,
short_time: 0,
user_id: "user".into(),
repeat: false,
provisional: false,
},
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "test/path2".into(),
signal: SIG_CHNG.into(),
source: METH_GET.into(),
value: shvproto::make_map!("a" => 1, "b" => 2).into(),
access_level: AccessLevel::Read as _,
short_time: 123,
user_id: ().into(),
repeat: true,
provisional: true,
},
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "".into(),
signal: "foo".into(),
source: METH_GET.into(), value: shvproto::make_map!("a" => 1, "b" => 2).into(),
access_level: AccessLevel::Read as _,
short_time: -1,
user_id: shvproto::make_map!("userName" => "abc", "agent" => "xyz").into(),
repeat: false,
provisional: true,
},
];
let mut writer = JournalWriterLog2::new(Cursor::new(Vec::new()));
for entry in &entries {
writer.append(entry).await.unwrap();
}
let data = writer.writer.into_inner();
let reader = JournalReaderLog2::new(Cursor::new(data));
let mut enumerated_reader = reader.enumerate();
while let Some((ix, result)) = enumerated_reader.next().await {
let entry = &entries[ix];
let entry2 = result.unwrap();
assert_eq!(entry, &entry2);
}
}).await;
}
#[apply(test!)]
async fn journal_write_and_read_log3(ex: &Executor<'_>) {
ex.spawn(async move {
let entries = [
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "test/path".into(),
signal: SIG_CHNG.into(),
source: METH_GET.into(),
value: 42.into(),
access_level: AccessLevel::Read as _,
short_time: -1,
user_id: "user".into(),
repeat: false,
provisional: false,
},
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "test/path2".into(),
signal: SIG_CHNG.into(),
source: METH_GET.into(),
value: shvproto::make_map!("a" => 1, "b" => 2).into(),
access_level: AccessLevel::Read as _,
short_time: -1,
user_id: ().into(),
repeat: true,
provisional: true,
},
JournalEntry {
epoch_msec: shvproto::DateTime::now().epoch_msec(),
path: "".into(),
signal: "foo".into(),
source: "bar".into(),
value: shvproto::make_map!("a" => 1, "b" => 2).into(),
access_level: AccessLevel::Read as _,
short_time: -1,
user_id: shvproto::make_map!("userName" => "abc", "agent" => "xyz").into(),
repeat: false,
provisional: true,
},
];
let mut writer = JournalWriterLog3::new(Cursor::new(Vec::new())).await.unwrap();
for entry in &entries {
writer.append(entry.clone()).await.unwrap();
}
let data = writer.writer.into_inner();
let reader = JournalReaderLog3::new(Cursor::new(data));
let mut enumerated_reader = reader.enumerate();
while let Some((ix, result)) = enumerated_reader.next().await {
let entry = &entries[ix];
let entry2 = result.unwrap();
assert_eq!(entry, &entry2);
}
}).await;
}
#[test]
fn get_log_2() {
let mut file = std::fs::File::open("tests/log2.cpon").unwrap();
let mut reader = CponReader::new(&mut file);
let reader = Log2Reader::new(reader.read().unwrap()).unwrap();
let epoch_ms_now = shvproto::DateTime::now().epoch_msec();
let res = reader
.map(|item|
item.map(|mut entry| {
if entry.epoch_msec == 0 {
entry.epoch_msec = epoch_ms_now;
}
entry
})
)
.collect::<Result<Vec<_>, _>>()
.unwrap();
println!("{res:?}");
}
fn make_journal_entry(
epoch_msec: i64,
path: &str,
value: impl Into<shvproto::RpcValue>,
repeat: bool,
provisional: bool,
signal: &str,
) -> JournalEntry {
JournalEntry {
epoch_msec,
path: path.to_string(),
signal: signal.to_string(),
source: "get".to_string(),
value: value.into(),
access_level: AccessLevel::Read as _,
short_time: NO_SHORT_TIME,
user_id: "testuser".into(),
repeat,
provisional,
}
}
#[test]
fn log2_journal_to_from_rpcvalue() {
let entries = [
make_journal_entry(1000, "foo", 123, true, false, SIG_CHNG),
make_journal_entry(2000, "bar", false, true, false, SIG_CHNG),
make_journal_entry(3000, "x/y/z", false, true, false, SIG_CHNG),
make_journal_entry(4000, "bar", "asdf", false, false, SIG_CHNG),
make_journal_entry(5000, "baz", 0, false, false, SIG_CHNG),
make_journal_entry(5000, "foo", 10, false, true, SIG_CHNG),
];
{
let (mut rv, paths_dict) = journal_entries_to_rpcvalue(&entries, false);
#[expect(clippy::cast_possible_wrap, reason = "Not many entries to truncate")]
let header = Log2Header {
record_count: entries.len() as _,
record_count_limit: RECORD_COUNT_LIMIT_DEFAULT,
record_count_limit_hit: false,
date_time: shvproto::DateTime::from_epoch_msec(1000),
since: shvproto::DateTime::from_epoch_msec(entries.first().unwrap().epoch_msec),
until: shvproto::DateTime::from_epoch_msec(entries.last().unwrap().epoch_msec),
with_paths_dict: false,
with_snapshot: false,
paths_dict,
log_params: super::GetLog2Params::default(),
log_version: 2,
};
rv.meta = Some(Box::new(header.clone().into()));
let reader = Log2Reader::new(rv).unwrap();
assert_eq!(reader.header, header);
assert_eq!(reader.collect::<Result<Vec<_>,_>>().unwrap(), entries);
}
}
#[test]
fn path_pattern_match() {
assert!(matches_path_pattern("foo/bar/x/bar/baz", "foo/**/bar/baz"));
assert!(matches_path_pattern("foo/a/b/c/bar/baz", "foo/**/bar/baz"));
assert!(matches_path_pattern("foo/bar/baz", "foo/**/bar/baz"));
assert!(!matches_path_pattern("foo/bar/x/baz", "foo/**/bar/baz"));
assert!(!matches_path_pattern("foo/bar/x/bar", "foo/**/bar/baz"));
}
#[test]
fn parse_journal_entry_log2_variants() {
assert_eq!(parse_journal_entry_log2(
"2025-04-28T11:51:14.300Z\t\tsystem/status\t2u\t\tchng\t1").unwrap(),
JournalEntry {
epoch_msec: shvproto::DateTime::from_iso_str("2025-04-28T11:51:14.300Z").unwrap().epoch_msec(),
path: "system/status".into(),
signal: "chng".into(),
source: "get".into(),
value: 2u64.into(),
access_level: AccessLevel::Read as i32,
short_time: -1,
user_id: ().into(),
repeat: true,
provisional: false,
}
);
assert_eq!(parse_journal_entry_log2(
"2025-04-28T11:51:14.300Z\t\tsystem/status\t2u").unwrap(),
JournalEntry {
epoch_msec: shvproto::DateTime::from_iso_str("2025-04-28T11:51:14.300Z").unwrap().epoch_msec(),
path: "system/status".into(),
signal: "chng".into(),
source: "get".into(),
value: 2u64.into(),
access_level: AccessLevel::Read as i32,
short_time: -1,
user_id: ().into(),
repeat: true,
provisional: false,
}
);
let line = "2025-04-28T11:51:14.300Z\t\tsystem/status";
assert_eq!(parse_journal_entry_log2(line).unwrap_err().to_string(), format!("Missing value on line: {line}"));
}
}