use crate::protocol::Notification;
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs::{self, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::sync::Mutex;
use tracing::{debug, error, warn};
#[derive(Clone)]
pub struct NotificationFileWriter {
base_path: PathBuf,
file_cache: Arc<Mutex<HashMap<String, BufWriter<fs::File>>>>,
enabled: bool,
}
#[derive(Debug, serde::Serialize)]
struct NotificationEntry {
timestamp: i64,
value: Value,
#[serde(skip_serializing_if = "Option::is_none")]
sequence: Option<u64>,
}
impl NotificationFileWriter {
pub async fn new(base_path: PathBuf, enabled: bool) -> Result<Self, std::io::Error> {
if enabled {
fs::create_dir_all(&base_path).await?;
debug!(
"๐ Notification file writer initialized at {}",
base_path.display()
);
} else {
debug!("๐ Notification file writer disabled");
}
Ok(Self {
base_path,
file_cache: Arc::new(Mutex::new(HashMap::new())),
enabled,
})
}
pub async fn write(&self, notification: &Notification) -> Result<(), std::io::Error> {
if !self.enabled {
return Ok(());
}
let (record_name, value, timestamp, sequence) =
match extract_notification_data(notification) {
Some(data) => data,
None => {
warn!("โ ๏ธ Failed to extract record data from notification");
return Ok(()); }
};
let date = chrono::Utc::now().format("%Y-%m-%d").to_string();
let sanitized_record = sanitize_record_name(&record_name);
let file_key = format!("{}__{}", date, sanitized_record);
let filename = format!("{}.jsonl", file_key);
let file_path = self.base_path.join(&filename);
let mut cache = self.file_cache.lock().await;
let writer = match cache.get_mut(&file_key) {
Some(writer) => writer,
None => {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)
.await
.map_err(|e| {
error!(
"Failed to open notification file {}: {}",
file_path.display(),
e
);
e
})?;
debug!(
"๐ Opened notification file: {} for record {}",
file_path.display(),
record_name
);
cache.insert(file_key.clone(), BufWriter::new(file));
cache.get_mut(&file_key).unwrap()
}
};
let entry = NotificationEntry {
timestamp,
value,
sequence,
};
let line = serde_json::to_string(&entry).map_err(|e| {
error!("Failed to serialize notification entry: {}", e);
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
writer.write_all(line.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
debug!(
"โ
Wrote notification for {} to {}",
record_name,
file_path.display()
);
Ok(())
}
pub async fn flush_all(&self) -> Result<(), std::io::Error> {
if !self.enabled {
return Ok(());
}
let mut cache = self.file_cache.lock().await;
for (key, writer) in cache.iter_mut() {
if let Err(e) = writer.flush().await {
error!("Failed to flush file {}: {}", key, e);
return Err(e);
}
}
debug!("๐พ Flushed {} notification files", cache.len());
Ok(())
}
pub async fn cleanup_old_handles(&self) {
if !self.enabled {
return;
}
let current_date = chrono::Utc::now().format("%Y-%m-%d").to_string();
let mut cache = self.file_cache.lock().await;
let old_keys: Vec<String> = cache
.keys()
.filter(|key| !key.starts_with(¤t_date))
.cloned()
.collect();
for key in old_keys {
cache.remove(&key);
debug!("๐งน Closed old notification file: {}", key);
}
debug!("๐งน Cleaned up old file handles, {} active", cache.len());
}
}
fn extract_notification_data(
notification: &Notification,
) -> Option<(String, Value, i64, Option<u64>)> {
let params = notification.params.as_ref()?;
let record_name = params.get("record_name")?.as_str()?.to_string();
let value = params.get("value")?.clone();
let timestamp = params.get("timestamp")?.as_i64()?;
let sequence = params.get("sequence").and_then(|s| s.as_u64());
Some((record_name, value, timestamp, sequence))
}
fn sanitize_record_name(name: &str) -> String {
name.chars()
.map(|c| match c {
'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' => c,
':' => '_', _ => '_',
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_sanitize_record_name() {
assert_eq!(
sanitize_record_name("server::Temperature"),
"server__Temperature"
);
assert_eq!(sanitize_record_name("foo/bar"), "foo_bar");
assert_eq!(sanitize_record_name("test.record"), "test_record");
assert_eq!(sanitize_record_name("valid_name-123"), "valid_name-123");
}
#[tokio::test]
async fn test_writer_creation_disabled() {
let writer = NotificationFileWriter::new(PathBuf::from("/tmp/test"), false)
.await
.unwrap();
assert!(!writer.enabled);
}
#[tokio::test]
async fn test_extract_notification_data() {
let notification = Notification::new(
"notifications/resources/updated",
Some(json!({
"record_name": "server::Temperature",
"value": {"celsius": 25.0},
"timestamp": 1234567890,
"sequence": 42
})),
);
let result = extract_notification_data(¬ification);
assert!(result.is_some());
let (record_name, _value, timestamp, sequence) = result.unwrap();
assert_eq!(record_name, "server::Temperature");
assert_eq!(timestamp, 1234567890);
assert_eq!(sequence, Some(42));
}
#[tokio::test]
async fn test_extract_notification_data_no_sequence() {
let notification = Notification::new(
"notifications/resources/updated",
Some(json!({
"record_name": "server::Status",
"value": {"status": "ok"},
"timestamp": 1234567890
})),
);
let result = extract_notification_data(¬ification);
assert!(result.is_some());
let (_record_name, _value, _timestamp, sequence) = result.unwrap();
assert_eq!(sequence, None);
}
}