#![allow(dead_code)]
use crate::notifier::{ActivityNotifier, EventStream};
use async_trait::async_trait;
use futures_util::StreamExt;
use serde_json::Value;
use std::sync::Arc;
#[async_trait]
pub trait ActivitySource: Send + Sync + 'static {
async fn fetch_since(
&self,
tenant_id: &str,
conversation_id: &str,
since_watermark: u64,
) -> Result<(Vec<Value>, u64), String>;
}
#[derive(Debug, thiserror::Error)]
pub enum PumpError {
#[error("replay too large: {count} > {max}")]
ReplayTooLarge { count: usize, max: usize },
#[error("activity source error: {0}")]
Source(String),
#[error("notifier error: {0}")]
Notifier(String),
}
#[derive(Debug)]
pub enum PumpFrame {
Activities {
activities: Vec<Value>,
next_watermark: u64,
},
Error(String),
}
pub struct Pump {
source: Arc<dyn ActivitySource>,
notifier: Arc<dyn ActivityNotifier>,
max_replay_size: usize,
}
impl Pump {
pub fn new(
source: Arc<dyn ActivitySource>,
notifier: Arc<dyn ActivityNotifier>,
max_replay_size: usize,
) -> Self {
Self {
source,
notifier,
max_replay_size,
}
}
pub async fn run(
&self,
tenant_id: String,
conversation_id: String,
initial_watermark: u64,
tx: tokio::sync::mpsc::Sender<PumpFrame>,
) -> Result<(), PumpError> {
eprintln!(
"[ws pump] run start tenant={} conv={} initial_watermark={}",
tenant_id, conversation_id, initial_watermark,
);
let mut events: EventStream = self
.notifier
.subscribe(&tenant_id, &conversation_id)
.await
.map_err(|e| {
eprintln!(
"[ws pump] notifier subscribe FAILED tenant={} conv={} err={}",
tenant_id, conversation_id, e,
);
PumpError::Notifier(e.to_string())
})?;
eprintln!(
"[ws pump] subscribed, calling fetch_since (replay) tenant={} conv={}",
tenant_id, conversation_id,
);
let (replay_activities, mut cursor) = self
.source
.fetch_since(&tenant_id, &conversation_id, initial_watermark)
.await
.map_err(|e| {
eprintln!(
"[ws pump] fetch_since FAILED tenant={} conv={} err={}",
tenant_id, conversation_id, e,
);
PumpError::Source(e)
})?;
eprintln!(
"[ws pump] replay completed tenant={} conv={} activity_count={} cursor={}",
tenant_id,
conversation_id,
replay_activities.len(),
cursor,
);
if replay_activities.len() > self.max_replay_size {
return Err(PumpError::ReplayTooLarge {
count: replay_activities.len(),
max: self.max_replay_size,
});
}
if !replay_activities.is_empty()
&& tx
.send(PumpFrame::Activities {
activities: replay_activities,
next_watermark: cursor,
})
.await
.is_err()
{
return Ok(()); }
eprintln!(
"[ws pump] entering live loop tenant={} conv={} cursor={}",
tenant_id, conversation_id, cursor,
);
while let Some(event) = events.next().await {
eprintln!(
"[ws pump] live event recv tenant={} conv={} event_watermark={} cursor={}",
tenant_id, conversation_id, event.new_watermark, cursor,
);
if event.new_watermark < cursor {
continue;
}
match self
.source
.fetch_since(
&event.tenant_id,
&event.conversation_id,
cursor.saturating_sub(1),
)
.await
{
Ok((activities, new_cursor)) => {
let raw_count = activities.len();
let to_send: Vec<Value> = activities
.into_iter()
.filter(|a| {
a.get("channelData")
.and_then(|cd| cd.get("watermark"))
.and_then(|w| w.as_u64())
.map(|w| w >= cursor)
.unwrap_or(true)
})
.collect();
eprintln!(
"[ws pump] live fetch ok tenant={} conv={} raw={} after_filter={} new_cursor={} cursor_was={}",
tenant_id,
conversation_id,
raw_count,
to_send.len(),
new_cursor,
cursor,
);
if !to_send.is_empty() {
cursor = new_cursor;
if tx
.send(PumpFrame::Activities {
activities: to_send,
next_watermark: new_cursor,
})
.await
.is_err()
{
eprintln!(
"[ws pump] tx.send failed (client gone) tenant={} conv={}",
tenant_id, conversation_id,
);
return Ok(());
}
}
}
Err(err) => {
eprintln!(
"[ws pump] live fetch FAILED tenant={} conv={} err={}",
tenant_id, conversation_id, err,
);
let _ = tx.send(PumpFrame::Error(err)).await;
}
}
}
eprintln!(
"[ws pump] events stream ended tenant={} conv={}",
tenant_id, conversation_id,
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::notifier::{InMemoryNotifier, NotifyEvent};
use serde_json::json;
use std::sync::Mutex;
struct FakeSource {
activities: Mutex<Vec<Value>>,
next_watermark: Mutex<u64>,
}
impl FakeSource {
fn new() -> Self {
Self {
activities: Mutex::new(vec![]),
next_watermark: Mutex::new(0),
}
}
fn append(&self, text: &str) -> u64 {
let mut wm = self.next_watermark.lock().unwrap();
let watermark = *wm;
*wm += 1;
self.activities.lock().unwrap().push(json!({
"type": "message",
"text": text,
"channelData": {"watermark": watermark}
}));
watermark
}
}
#[async_trait]
impl ActivitySource for FakeSource {
async fn fetch_since(
&self,
_tenant: &str,
_conv: &str,
since: u64,
) -> Result<(Vec<Value>, u64), String> {
let acts = self.activities.lock().unwrap();
let next = *self.next_watermark.lock().unwrap();
let filtered: Vec<Value> = acts
.iter()
.filter(|a| {
a.get("channelData")
.and_then(|cd| cd.get("watermark"))
.and_then(|w| w.as_u64())
.map(|w| w >= since)
.unwrap_or(false)
})
.cloned()
.collect();
Ok((filtered, next))
}
}
#[tokio::test]
async fn replay_returns_activities_above_initial_watermark() {
let source = Arc::new(FakeSource::new());
source.append("a");
source.append("b");
source.append("c");
let notifier = Arc::new(InMemoryNotifier::new(8));
let pump = Pump::new(source.clone(), notifier, 1000);
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let handle = tokio::spawn(async move { pump.run("t1".into(), "c1".into(), 0, tx).await });
let frame = rx.recv().await.expect("frame");
match frame {
PumpFrame::Activities {
activities,
next_watermark,
} => {
assert_eq!(activities.len(), 3);
assert_eq!(next_watermark, 3);
}
_ => panic!("unexpected frame"),
}
drop(rx);
handle.abort();
let _ = handle.await;
}
#[tokio::test]
async fn replay_too_large_returns_error() {
let source = Arc::new(FakeSource::new());
for i in 0..5 {
source.append(&format!("msg-{i}"));
}
let notifier = Arc::new(InMemoryNotifier::new(8));
let pump = Pump::new(source, notifier, 3);
let (tx, _rx) = tokio::sync::mpsc::channel(8);
let result = pump.run("t1".into(), "c1".into(), 0, tx).await;
assert!(matches!(
result,
Err(PumpError::ReplayTooLarge { count: 5, max: 3 })
));
}
#[tokio::test]
async fn live_event_triggers_fetch_and_send() {
let source = Arc::new(FakeSource::new());
let notifier: Arc<dyn ActivityNotifier> = Arc::new(InMemoryNotifier::new(8));
let pump = Pump::new(source.clone(), notifier.clone(), 1000);
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
let handle = tokio::spawn(async move { pump.run("t1".into(), "c1".into(), 0, tx).await });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let wm = source.append("hello-live");
notifier
.publish(NotifyEvent {
tenant_id: "t1".into(),
conversation_id: "c1".into(),
new_watermark: wm + 1,
})
.await;
let frame = tokio::time::timeout(std::time::Duration::from_millis(500), rx.recv())
.await
.expect("timed out")
.expect("no frame");
match frame {
PumpFrame::Activities { activities, .. } => {
assert_eq!(activities.len(), 1);
assert_eq!(activities[0]["text"], "hello-live");
}
_ => panic!("unexpected"),
}
drop(rx);
handle.abort();
let _ = handle.await;
}
}