use std::convert::TryFrom;
use tracing::{span, Level};
use dsf_core::prelude::*;
use dsf_core::net;
use dsf_core::service::publisher::{DataOptions, Publisher};
use dsf_rpc::{DataInfo, PublishInfo, PublishOptions};
use crate::daemon::Dsf;
use crate::error::Error;
use crate::io;
use crate::core::subscribers::SubscriptionKind;
impl<C> Dsf<C>
where
C: io::Connector + Clone + Sync + Send + 'static,
{
pub async fn publish(&mut self, options: PublishOptions) -> Result<PublishInfo, Error> {
let span = span!(Level::DEBUG, "publish");
let _enter = span.enter();
let id = self.resolve_identifier(&options.service)?;
let mut services = self.services();
let (info, addresses, req) = {
let service_arc = match services.find(&id) {
Some(s) => s,
None => {
error!("unknown service (id: {})", id);
return Err(Error::UnknownService);
}
};
let mut service_inst = service_arc.write().unwrap();
let service = &mut service_inst.service;
let _private_key = match service.private_key() {
Some(s) => s,
None => {
error!("no service private key (id: {})", id);
return Err(Error::NoPrivateKey);
}
};
let body = match options.data {
Some(d) => Body::Cleartext(d),
None => Body::None,
};
let data_options = DataOptions {
data_kind: options.kind.into(),
body,
..Default::default()
};
info!("Generating data page");
let mut buff = vec![0u8; 1024];
let (n, mut page) = service.publish_data(data_options, &mut buff).unwrap();
page.raw = Some(buff[..n].to_vec());
let info = PublishInfo {
index: page.header().index(),
};
let service_id = service.id();
info!("Storing data page");
let data_info = DataInfo::try_from(&page).unwrap();
self.data().store_data(&data_info, &page)?;
services.sync_inst(&service_inst);
let req = net::Request::new(
self.id(),
rand::random(),
net::RequestKind::PushData(service_id.clone(), vec![page]),
Flags::default(),
);
let subscriptions = self.subscribers().find(&service_id)?;
let addresses: Vec<_> = subscriptions
.iter()
.filter_map(|s| {
if let SubscriptionKind::Peer(peer_id) = &s.info.kind {
self.peers().find(peer_id).map(|p| p.address())
} else {
None
}
})
.collect();
(info, addresses, req)
};
info!("Sending data push messages");
self.request_all(&addresses, req).await;
Ok(info)
}
}