use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tracing::debug;
use crate::control::state::SharedState;
use super::codec::RespValue;
use super::command::RespCommand;
use super::session::RespSession;
pub async fn handle_subscribe(
cmd: &RespCommand,
session: &RespSession,
state: &SharedState,
stream: &mut TcpStream,
) -> crate::Result<()> {
if cmd.argc() < 1 {
let resp = RespValue::err("ERR wrong number of arguments for 'subscribe' command");
let bytes = resp.to_bytes();
stream
.write_all(&bytes)
.await
.map_err(|e| crate::Error::Bridge {
detail: format!("RESP write: {e}"),
})?;
return Ok(());
}
let channels: Vec<String> = cmd
.args
.iter()
.filter_map(|a| std::str::from_utf8(a).ok().map(|s| s.to_string()))
.collect();
let mut subscription = state.change_stream.subscribe(None, Some(session.tenant_id));
for (i, channel) in channels.iter().enumerate() {
let confirm = RespValue::array(vec![
RespValue::bulk_str("subscribe"),
RespValue::bulk_str(channel),
RespValue::integer((i + 1) as i64),
]);
let bytes = confirm.to_bytes();
stream
.write_all(&bytes)
.await
.map_err(|e| crate::Error::Bridge {
detail: format!("RESP write: {e}"),
})?;
}
debug!(
channels = ?channels,
"RESP SUBSCRIBE: entering subscription mode"
);
loop {
match subscription.receiver.recv().await {
Ok(event) => {
if !channels.contains(&event.collection) {
continue;
}
let payload = serde_json::json!({
"op": format!("{:?}", event.operation),
"key": event.document_id,
"lsn": event.lsn.as_u64(),
"timestamp_ms": event.timestamp_ms,
})
.to_string();
let msg = RespValue::array(vec![
RespValue::bulk_str("message"),
RespValue::bulk_str(&event.collection),
RespValue::bulk(payload.into_bytes()),
]);
let bytes = msg.to_bytes();
if stream.write_all(&bytes).await.is_err() {
break; }
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let msg = RespValue::array(vec![
RespValue::bulk_str("message"),
RespValue::bulk_str("__system"),
RespValue::bulk(format!("{{\"warning\":\"lagged {n} events\"}}").into_bytes()),
]);
let bytes = msg.to_bytes();
if stream.write_all(&bytes).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break; }
}
}
Ok(())
}
pub async fn handle_publish(
cmd: &RespCommand,
_session: &RespSession,
state: &SharedState,
) -> RespValue {
if cmd.argc() < 2 {
return RespValue::err("ERR wrong number of arguments for 'publish' command");
}
let channel = cmd.arg_str(0).unwrap_or("");
let message = cmd.arg_str(1).unwrap_or("");
match state
.topic_registry
.publish(channel, message.to_string(), "resp_client")
{
Ok(_seq) => {
RespValue::integer(1)
}
Err(e) => RespValue::err(format!("ERR publish failed: {e}")),
}
}