use std::collections::HashMap;
use crate::broker::{
channel::{ChannelPongSender, ResponseSender, ResponseStreamSender},
model::{BroadcastId, BrokerControl},
};
use crate::error::Result;
use crate::model::{
incoming::{
ApiMessage, ChannelMessage, ConnectedMessage, IncomingMessage, IncomingMessageType,
NoteUpdatedMessage,
},
outgoing::OutgoingMessage,
ApiRequestId, ChannelId, SubNoteId,
};
use log::{info, warn};
use misskey_core::model::ApiResult;
use serde_json::value::{self, Value};
#[derive(Debug)]
struct ApiHandler {
message: OutgoingMessage,
sender: ResponseSender<ApiResult<Value>>,
}
#[derive(Debug)]
struct SubNoteHandler {
message: OutgoingMessage,
sender: ResponseStreamSender<Value>,
}
#[derive(Debug)]
struct ChannelHandler {
message: OutgoingMessage,
pong: Option<ChannelPongSender>,
sender: ResponseStreamSender<Value>,
}
#[derive(Debug)]
pub(crate) struct Handler {
api: HashMap<ApiRequestId, ApiHandler>,
sub_note: HashMap<SubNoteId, SubNoteHandler>,
channel: HashMap<ChannelId, ChannelHandler>,
broadcast: HashMap<&'static str, HashMap<BroadcastId, ResponseStreamSender<Value>>>,
}
impl Handler {
pub fn new() -> Handler {
Handler {
api: HashMap::new(),
sub_note: HashMap::new(),
channel: HashMap::new(),
broadcast: HashMap::new(),
}
}
pub fn restore_messages(&mut self) -> Vec<OutgoingMessage> {
let mut messages = Vec::new();
for ApiHandler { message, .. } in self.api.values() {
messages.push(message.clone());
}
for SubNoteHandler { message, .. } in self.sub_note.values() {
messages.push(message.clone());
}
for ChannelHandler { message, .. } in self.channel.values() {
messages.push(message.clone());
}
messages
}
pub fn control(&mut self, ctrl: BrokerControl) -> Option<OutgoingMessage> {
match ctrl {
BrokerControl::Api {
id,
endpoint,
data,
sender,
} => {
let message = OutgoingMessage::Api { id, endpoint, data };
let handler = ApiHandler {
message: message.clone(),
sender,
};
self.api.insert(id, handler);
Some(message)
}
BrokerControl::Connect {
id,
sender,
params,
name,
pong,
} => {
let message = OutgoingMessage::Connect {
channel: name,
id,
params,
pong: true,
};
let handler = ChannelHandler {
message: message.clone(),
sender,
pong: Some(pong),
};
self.channel.insert(id, handler);
Some(message)
}
BrokerControl::Channel { id, message } => {
Some(OutgoingMessage::Channel { id, message })
}
BrokerControl::Disconnect { id } => {
self.channel.remove(&id);
Some(OutgoingMessage::Disconnect { id })
}
BrokerControl::SubNote { id, sender } => {
let message = OutgoingMessage::SubNote { id: id.clone() };
let handler = SubNoteHandler {
message: message.clone(),
sender,
};
self.sub_note.insert(id, handler);
Some(message)
}
BrokerControl::UnsubNote { id } => {
self.sub_note.remove(&id);
Some(OutgoingMessage::UnsubNote { id })
}
BrokerControl::StartBroadcast { id, type_, sender } => {
self.broadcast
.entry(type_)
.or_insert_with(HashMap::new)
.insert(id, sender);
None
}
BrokerControl::StopBroadcast { id } => {
for senders in &mut self.broadcast.values_mut() {
if senders.remove(&id).is_some() {
break;
}
}
None
}
}
}
pub fn is_empty(&self) -> bool {
self.api.is_empty()
&& self.sub_note.is_empty()
&& self.channel.is_empty()
&& self.broadcast.values().all(|m| m.is_empty())
}
pub async fn handle(&mut self, msg: IncomingMessage) -> Result<()> {
match msg.type_ {
IncomingMessageType::Api(id) => {
if let Some(ApiHandler { sender, .. }) = self.api.remove(&id) {
let msg: ApiResult<ApiMessage> = value::from_value(msg.body)?;
sender.send(msg.map(|m| m.res));
} else {
warn!("unknown API response message with {:?}, skipping", id);
return Ok(());
}
}
IncomingMessageType::Channel => {
let ChannelMessage { id, message } = value::from_value(msg.body)?;
let ChannelHandler { sender, .. } = match self.channel.get_mut(&id) {
Some(x) => x,
None => {
warn!("unhandled channel message with {:?}, skipping", id);
return Ok(());
}
};
if sender.try_send(message).is_err() {
warn!("stale channel handler for {:?}, deleted", id);
self.channel.remove(&id);
}
}
IncomingMessageType::Connected => {
let ConnectedMessage { id } = value::from_value(msg.body)?;
let ChannelHandler { pong, .. } = match self.channel.get_mut(&id) {
Some(x) => x,
None => {
warn!("unhandled connected message with {:?}, skipping", id);
return Ok(());
}
};
if let Some(pong) = pong.take() {
pong.send();
} else {
info!("duplicated connected message with {:?}, skipping", id);
}
}
IncomingMessageType::NoteUpdated => {
let NoteUpdatedMessage { id, message } = value::from_value(msg.body)?;
let SubNoteHandler { sender, .. } = match self.sub_note.get_mut(&id) {
Some(x) => x,
None => {
warn!("unhandled subnote message with {:?}, skipping", id);
return Ok(());
}
};
if sender.try_send(message).is_err() {
warn!("stale subnote handler for {:?}, deleted", id);
self.sub_note.remove(&id);
}
}
IncomingMessageType::Other(type_) => {
let senders = match self.broadcast.get_mut(type_.as_str()) {
Some(x) => x,
None => {
info!("unhandled broadcast message {}, skipping", type_);
return Ok(());
}
};
let body = msg.body;
senders.retain(|id, sender| {
if sender.try_send(body.clone()).is_err() {
warn!("stale broadcast handler {}:{:?}, deleted", type_, id);
false
} else {
true
}
});
}
}
Ok(())
}
}