pub mod parcel;
use crate::actors::engine::RillEngine;
use crate::actors::recorder::{Recorder, RecorderLink};
use crate::config::EngineConfig;
use anyhow::Error;
use async_trait::async_trait;
use meio::{
ActionHandler, Actor, Context, Eliminated, Id, IdOf, InstantActionHandler, InterruptedBy,
StartedBy, TaskEliminated, TaskError,
};
use meio_connect::{
client::{WsClient, WsClientStatus, WsSender},
WsIncoming,
};
use rill_protocol::flow::core;
use rill_protocol::io::provider::{
Description, ProviderProtocol, ProviderToServer, ServerToProvider,
};
use rill_protocol::io::transport::{Direction, Envelope, WideEnvelope};
use rill_protocol::pathfinder::{Pathfinder, Record};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Default, Clone)]
pub(crate) struct RillSender {
sender: Option<WsSender<WideEnvelope<ProviderProtocol, ProviderToServer>>>,
}
impl RillSender {
fn set(&mut self, sender: WsSender<WideEnvelope<ProviderProtocol, ProviderToServer>>) {
self.sender = Some(sender);
}
pub fn reset(&mut self) {
self.sender.take();
}
pub fn response(&mut self, direction: Direction<ProviderProtocol>, data: ProviderToServer) {
if let Some(sender) = self.sender.as_ref() {
let envelope = WideEnvelope { direction, data };
sender.send(envelope);
} else {
log::error!("Can't send a response. Not connected.");
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Group {
WsConnection,
ActiveRequests,
ParcelStream,
Recorders,
}
pub struct RillConnector {
url: String,
config: EngineConfig,
sender: RillSender,
recorders: Pathfinder<RecorderLink>,
registered: HashMap<Id, Description>,
description: Description,
}
impl RillConnector {
pub fn new(config: EngineConfig) -> Self {
let entry_id = config.provider_name();
let provider_type = config.provider_type();
let description = Description {
path: entry_id.into(),
stream_type: provider_type,
};
Self {
url: config.node_url(),
config,
sender: RillSender::default(),
recorders: Pathfinder::default(),
registered: HashMap::new(),
description,
}
}
fn send_global(&mut self, msg: ProviderToServer) {
self.sender.response(Direction::broadcast(), msg);
}
}
#[async_trait]
impl Actor for RillConnector {
type GroupBy = Group;
fn name(&self) -> String {
format!("RillConnector({})", &self.url)
}
}
#[async_trait]
impl StartedBy<RillEngine> for RillConnector {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
ctx.termination_sequence(vec![
Group::ActiveRequests,
Group::WsConnection,
Group::ParcelStream,
Group::Recorders,
]);
self.attach_distributor(ctx).await?;
let client = WsClient::new(
self.config.node_url(),
Some(Duration::from_secs(1)),
ctx.address().clone(),
);
ctx.spawn_task(client, (), Group::WsConnection);
Ok(())
}
}
#[async_trait]
impl InterruptedBy<RillEngine> for RillConnector {
async fn handle(&mut self, ctx: &mut Context<Self>) -> Result<(), Error> {
self.detach_distributor();
ctx.shutdown();
Ok(())
}
}
#[async_trait]
impl InstantActionHandler<WsClientStatus<ProviderProtocol>> for RillConnector {
async fn handle(
&mut self,
status: WsClientStatus<ProviderProtocol>,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
match status {
WsClientStatus::Connected { sender } => {
self.sender.set(sender);
for desc in self.registered.values_mut() {
let path = &desc.path;
let link = self.recorders.find_mut(path).and_then(Record::get_link_mut);
if let Some(link) = link {
link.connected(self.sender.clone()).await.ok();
}
}
let description = self.description.clone();
let msg = ProviderToServer::Declare { description };
self.send_global(msg);
}
WsClientStatus::Failed { reason } => {
log::error!("Connection failed: {}", reason);
for desc in self.registered.values_mut() {
let path = &desc.path;
let link = self.recorders.find_mut(path).and_then(Record::get_link_mut);
if let Some(link) = link {
link.disconnected().await.ok();
}
}
}
}
Ok(())
}
}
#[async_trait]
impl ActionHandler<WsIncoming<Envelope<ProviderProtocol, ServerToProvider>>> for RillConnector {
async fn handle(
&mut self,
msg: WsIncoming<Envelope<ProviderProtocol, ServerToProvider>>,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
let envelope = msg.0;
log::trace!("Incoming request: {:?}", envelope);
let direct_id = envelope.direct_id;
let path = envelope.data.path;
let recorder_link = self
.recorders
.find_mut(&path)
.and_then(Record::get_link_mut);
if let Some(recorder) = recorder_link {
let request = envelope.data.request;
recorder.do_path_request(direct_id, request).await?;
} else {
log::warn!("Path not found: {:?}", path);
let msg = ProviderToServer::Error {
reason: format!("path {} not found", path),
};
self.sender.response(direct_id.into(), msg);
}
Ok(())
}
}
#[async_trait]
impl TaskEliminated<WsClient<ProviderProtocol, Self>, ()> for RillConnector {
async fn handle(
&mut self,
_id: IdOf<WsClient<ProviderProtocol, Self>>,
_tag: (),
_result: Result<(), TaskError>,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
Ok(())
}
}
#[async_trait]
impl<T: core::Flow> Eliminated<Recorder<T>> for RillConnector {
async fn handle(
&mut self,
id: IdOf<Recorder<T>>,
_ctx: &mut Context<Self>,
) -> Result<(), Error> {
let id: Id = id.into();
if let Some(desc) = self.registered.remove(&id) {
let path = &desc.path;
let link = self.recorders.find_mut(path).and_then(Record::take_link);
if link.is_some() {
} else {
log::error!("Recorder {:?} was registered without a link (lost).", id);
}
} else {
log::error!("Recorder {:?} wasn't registered.", id);
}
Ok(())
}
}