use std::hash::Hash;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::broker::channel::{ChannelPongSender, ResponseSender, ResponseStreamSender};
use crate::error::Error;
use crate::model::{ApiRequestId, ChannelId, SubNoteId};
use async_rwlock::RwLock;
use futures::future::{BoxFuture, Future, FutureExt};
use misskey_core::model::ApiResult;
use serde_json::Value;
use uuid::Uuid;
#[derive(Clone, PartialEq, Eq, Hash, Debug, Copy)]
pub(crate) struct BroadcastId(pub Uuid);
impl BroadcastId {
pub fn new() -> Self {
BroadcastId(Uuid::new_v4())
}
}
#[derive(Debug)]
pub(crate) enum BrokerControl {
Api {
id: ApiRequestId,
endpoint: &'static str,
data: Value,
sender: ResponseSender<ApiResult<Value>>,
},
Connect {
id: ChannelId,
name: &'static str,
params: Value,
sender: ResponseStreamSender<Value>,
pong: ChannelPongSender,
},
Channel {
id: ChannelId,
message: Value,
},
Disconnect {
id: ChannelId,
},
SubNote {
id: SubNoteId,
sender: ResponseStreamSender<Value>,
},
UnsubNote {
id: SubNoteId,
},
StartBroadcast {
id: BroadcastId,
type_: &'static str,
sender: ResponseStreamSender<Value>,
},
StopBroadcast {
id: BroadcastId,
},
}
#[derive(Debug, Clone)]
pub(crate) enum BrokerState {
Working,
Exited,
Dead(Error),
}
impl BrokerState {
pub fn dead(self) -> Option<Error> {
match self {
BrokerState::Working => None,
BrokerState::Exited => panic!("asked if broker is dead while it is already exited"),
BrokerState::Dead(e) => Some(e),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct SharedBrokerState(Arc<RwLock<BrokerState>>);
pub(crate) struct ReadBrokerState(BoxFuture<'static, BrokerState>);
impl Future for ReadBrokerState {
type Output = BrokerState;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<BrokerState> {
self.0.poll_unpin(cx)
}
}
impl SharedBrokerState {
pub fn working() -> SharedBrokerState {
SharedBrokerState(Arc::new(RwLock::new(BrokerState::Working)))
}
pub async fn set_exited(&self) {
let mut lock = self.0.write().await;
*lock = BrokerState::Exited;
}
pub async fn set_error(&self, err: Error) {
let mut lock = self.0.write().await;
*lock = BrokerState::Dead(err);
}
pub fn try_read(&self) -> Option<BrokerState> {
self.0.try_read().map(|lock| BrokerState::clone(&*lock))
}
pub fn read(&self) -> ReadBrokerState {
let p = Arc::clone(&self.0);
ReadBrokerState(Box::pin(async move {
let lock = p.read().await;
BrokerState::clone(&*lock)
}))
}
}