use kitsune2_api::*;
use message_handler::PublishMessageHandler;
use std::sync::Arc;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task::AbortHandle,
};
mod message_handler;
#[cfg(test)]
mod test;
pub const PUBLISH_MOD_NAME: &str = "Publish";
mod config {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "camelCase")]
pub struct CorePublishConfig {}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
#[serde(rename_all = "camelCase")]
pub struct CorePublishModConfig {
pub core_publish: CorePublishConfig,
}
}
pub use config::*;
#[derive(Debug)]
pub struct CorePublishFactory {}
impl CorePublishFactory {
pub fn create() -> DynPublishFactory {
Arc::new(Self {})
}
}
impl PublishFactory for CorePublishFactory {
fn default_config(&self, config: &mut Config) -> K2Result<()> {
config.set_module_config(&CorePublishModConfig::default())?;
Ok(())
}
fn validate_config(&self, _config: &Config) -> K2Result<()> {
Ok(())
}
fn create(
&self,
builder: Arc<Builder>,
space_id: SpaceId,
fetch: DynFetch,
peer_store: DynPeerStore,
transport: DynTransport,
) -> BoxFut<'static, K2Result<DynPublish>> {
Box::pin(async move {
let config: CorePublishModConfig =
builder.config.get_module_config()?;
let out: DynPublish = Arc::new(CorePublish::new(
config.core_publish,
space_id,
builder,
fetch,
peer_store,
transport,
));
Ok(out)
})
}
}
type OutgoingPublishOps = (Vec<OpId>, Url);
type IncomingPublishOps = (Vec<OpId>, Url);
type OutgoingAgentInfo = (Arc<AgentInfoSigned>, Url);
type IncomingAgentInfoEncoded = String;
#[derive(Debug)]
struct CorePublish {
outgoing_publish_ops_tx: Sender<OutgoingPublishOps>,
outgoing_publish_agent_tx: Sender<OutgoingAgentInfo>,
tasks: Vec<AbortHandle>,
}
impl CorePublish {
fn new(
config: CorePublishConfig,
space_id: SpaceId,
builder: Arc<Builder>,
fetch: DynFetch,
peer_store: DynPeerStore,
transport: DynTransport,
) -> Self {
Self::spawn_tasks(
config, space_id, builder, fetch, peer_store, transport,
)
}
}
impl Publish for CorePublish {
fn publish_ops(
&self,
op_ids: Vec<OpId>,
target: Url,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
if let Err(err) = self
.outgoing_publish_ops_tx
.send((op_ids, target.clone()))
.await
{
tracing::warn!(
"could not insert ops into ops publish queue: {err}"
);
}
Ok(())
})
}
fn publish_agent(
&self,
agent_info: Arc<AgentInfoSigned>,
target: Url,
) -> BoxFut<'_, K2Result<()>> {
Box::pin(async move {
if let Err(err) = self
.outgoing_publish_agent_tx
.send((agent_info, target))
.await
{
tracing::warn!(
"could not insert signed agent info into agent publish queue: {err}"
);
}
Ok(())
})
}
}
impl CorePublish {
pub fn spawn_tasks(
_config: CorePublishConfig,
space_id: SpaceId,
builder: Arc<Builder>,
fetch: DynFetch,
peer_store: DynPeerStore,
transport: DynTransport,
) -> Self {
let (outgoing_publish_ops_tx, outgoing_publish_ops_rx) =
channel::<OutgoingPublishOps>(16_384);
let (incoming_publish_ops_tx, incoming_publish_ops_rx) =
channel::<IncomingPublishOps>(16_384);
let (outgoing_publish_agent_tx, outgoing_publish_agent_rx) =
channel::<OutgoingAgentInfo>(16_384);
let (incoming_publish_agent_tx, incoming_publish_agent_rx) =
channel::<IncomingAgentInfoEncoded>(16_384);
let mut tasks = Vec::new();
let outgoing_publish_ops_task =
tokio::task::spawn(CorePublish::outgoing_publish_ops_task(
outgoing_publish_ops_rx,
space_id.clone(),
Arc::downgrade(&transport),
))
.abort_handle();
tasks.push(outgoing_publish_ops_task);
let incoming_publish_ops_task =
tokio::task::spawn(CorePublish::incoming_publish_ops_task(
incoming_publish_ops_rx,
fetch,
))
.abort_handle();
tasks.push(incoming_publish_ops_task);
let outgoing_publish_agent_task =
tokio::task::spawn(CorePublish::outgoing_publish_agent_task(
outgoing_publish_agent_rx,
space_id.clone(),
Arc::downgrade(&transport),
))
.abort_handle();
tasks.push(outgoing_publish_agent_task);
let incoming_publish_agent_task =
tokio::task::spawn(CorePublish::incoming_publish_agent_task(
incoming_publish_agent_rx,
peer_store,
builder,
))
.abort_handle();
tasks.push(incoming_publish_agent_task);
let message_handler = Arc::new(PublishMessageHandler {
incoming_publish_ops_tx,
incoming_publish_agent_tx,
});
transport.register_module_handler(
space_id.clone(),
PUBLISH_MOD_NAME.to_string(),
message_handler.clone(),
);
Self {
outgoing_publish_ops_tx,
outgoing_publish_agent_tx,
tasks,
}
}
async fn outgoing_publish_ops_task(
mut outgoing_publish_ops_rx: Receiver<OutgoingPublishOps>,
space_id: SpaceId,
transport: WeakDynTransport,
) {
while let Some((op_ids, peer_url)) =
outgoing_publish_ops_rx.recv().await
{
let Some(transport) = transport.upgrade() else {
tracing::warn!("Transport dropped, stopping publish ops task");
return;
};
let data = serialize_publish_ops_message(op_ids.clone());
if let Err(err) = transport
.send_module(
peer_url.clone(),
space_id.clone(),
PUBLISH_MOD_NAME.to_string(),
data,
)
.await
{
tracing::warn!(
?op_ids,
?peer_url,
"could not send publish ops: {err}"
);
}
}
}
async fn incoming_publish_ops_task(
mut response_rx: Receiver<IncomingPublishOps>,
fetch: DynFetch,
) {
while let Some((op_ids, peer)) = response_rx.recv().await {
tracing::debug!(?peer, ?op_ids, "incoming publish ops");
if let Err(err) = fetch.request_ops(op_ids.clone(), peer).await {
tracing::warn!(
"could not insert publish ops request into fetch queue: {err}"
);
};
}
}
async fn outgoing_publish_agent_task(
mut outgoing_publish_agent_rx: Receiver<OutgoingAgentInfo>,
space_id: SpaceId,
transport: WeakDynTransport,
) {
while let Some((agent_info, peer_url)) =
outgoing_publish_agent_rx.recv().await
{
let Some(transport) = transport.upgrade() else {
tracing::warn!(
"Transport dropped, stopping publish agent task"
);
return;
};
match serialize_publish_agent_message(&agent_info) {
Ok(data) => {
if let Err(err) = transport
.send_module(
peer_url.clone(),
space_id.clone(),
PUBLISH_MOD_NAME.to_string(),
data,
)
.await
{
tracing::debug!(
?agent_info,
?peer_url,
"could not send publish agent: {err}"
);
}
}
Err(err) => {
tracing::warn!(
?agent_info,
?peer_url,
"Failed to serialize publish agent message: {err}"
)
}
};
}
}
async fn incoming_publish_agent_task(
mut response_rx: Receiver<IncomingAgentInfoEncoded>,
peer_store: DynPeerStore,
builder: Arc<Builder>,
) {
while let Some(agent_info_encoded) = response_rx.recv().await {
match AgentInfoSigned::decode(
&builder.verifier,
agent_info_encoded.as_bytes(),
) {
Ok(agent_info) => {
if let Err(err) = peer_store.insert(vec![agent_info]).await
{
tracing::warn!(
"could not insert published agent info into peer store: {err}"
);
}
}
Err(err) => {
tracing::warn!("Failed to decode signed agent info: {err}");
}
}
}
}
}
impl Drop for CorePublish {
fn drop(&mut self) {
for t in self.tasks.iter() {
t.abort();
}
}
}