use kitsune2_api::*;
use message_handler::PublishMessageHandler;
use std::sync::Arc;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task::AbortHandle,
};
mod message_handler;
#[cfg(test)]
mod test;
pub const PUBLISH_MOD_NAME: &str = "Publish";
mod config {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct CorePublishConfig {}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CorePublishModConfig {
pub core_publish: CorePublishConfig,
}
}
pub use config::*;
#[derive(Debug)]
pub struct CorePublishFactory {}
impl CorePublishFactory {
pub fn create() -> DynPublishFactory {
Arc::new(Self {})
}
}
impl PublishFactory for CorePublishFactory {
fn default_config(&self, config: &mut Config) -> K2Result<()> {
config.set_module_config(&CorePublishModConfig::default())?;
Ok(())
}
fn validate_config(&self, _config: &Config) -> K2Result<()> {
Ok(())
}
fn create(
&self,
builder: Arc<Builder>,
space_id: SpaceId,
fetch: DynFetch,
transport: DynTransport,
) -> BoxFut<'static, K2Result<DynPublish>> {
Box::pin(async move {
let config: CorePublishModConfig =
builder.config.get_module_config()?;
let out: DynPublish = Arc::new(CorePublish::new(
config.core_publish,
space_id,
fetch,
transport,
));
Ok(out)
})
}
}
type OutgoingPublishOps = (Vec<OpId>, Url);
type IncomingPublishOps = (Vec<OpId>, Url);
#[derive(Debug)]
struct CorePublish {
outgoing_publish_ops_tx: Sender<OutgoingPublishOps>,
tasks: Vec<AbortHandle>,
}
impl CorePublish {
fn new(
config: CorePublishConfig,
space_id: SpaceId,
fetch: DynFetch,
transport: DynTransport,
) -> Self {
Self::spawn_tasks(config, space_id, fetch, transport)
}
}
impl Publish for CorePublish {
fn publish_ops(
&self,
op_ids: Vec<OpId>,
target: Url,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
if let Err(err) = self
.outgoing_publish_ops_tx
.send((op_ids, target.clone()))
.await
{
tracing::warn!(
"could not insert ops into ops publish queue: {err}"
);
}
Ok(())
})
}
}
impl CorePublish {
pub fn spawn_tasks(
_config: CorePublishConfig,
space_id: SpaceId,
fetch: DynFetch,
transport: DynTransport,
) -> Self {
let (outgoing_publish_ops_tx, outgoing_publish_ops_rx) =
channel::<OutgoingPublishOps>(16_384);
let (incoming_publish_ops_tx, incoming_publish_ops_rx) =
channel::<IncomingPublishOps>(16_384);
let mut tasks = Vec::new();
let outgoing_publish_ops_task =
tokio::task::spawn(CorePublish::outgoing_publish_ops_task(
outgoing_publish_ops_rx,
space_id.clone(),
transport.clone(),
))
.abort_handle();
tasks.push(outgoing_publish_ops_task);
let incoming_publish_ops_task =
tokio::task::spawn(CorePublish::incoming_publish_ops_task(
incoming_publish_ops_rx,
fetch,
))
.abort_handle();
tasks.push(incoming_publish_ops_task);
let message_handler = Arc::new(PublishMessageHandler {
incoming_publish_ops_tx,
});
transport.register_module_handler(
space_id.clone(),
PUBLISH_MOD_NAME.to_string(),
message_handler.clone(),
);
Self {
outgoing_publish_ops_tx,
tasks,
}
}
async fn outgoing_publish_ops_task(
mut outgoing_publish_ops_rx: Receiver<OutgoingPublishOps>,
space_id: SpaceId,
transport: DynTransport,
) {
while let Some((op_ids, peer_url)) =
outgoing_publish_ops_rx.recv().await
{
let data = serialize_publish_ops_message(op_ids.clone());
if let Err(err) = transport
.send_module(
peer_url.clone(),
space_id.clone(),
PUBLISH_MOD_NAME.to_string(),
data,
)
.await
{
tracing::warn!(
?op_ids,
?peer_url,
"could not send publish ops: {err}"
);
}
}
}
async fn incoming_publish_ops_task(
mut response_rx: Receiver<IncomingPublishOps>,
fetch: DynFetch,
) {
while let Some((op_ids, peer)) = response_rx.recv().await {
tracing::debug!(?peer, ?op_ids, "incoming publish ops");
if let Err(err) = fetch.request_ops(op_ids.clone(), peer).await {
tracing::warn!(
"could not insert publish ops request into fetch queue: {err}"
);
};
}
}
}
impl Drop for CorePublish {
fn drop(&mut self) {
for t in self.tasks.iter() {
t.abort();
}
}
}