use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::vortix_core::engine::error::EngineError;
use crate::vortix_core::engine::event::EventEnvelope;
use crate::vortix_core::engine::fsm::Engine;
use crate::vortix_core::engine::input::{Input, UserCommand};
use crate::vortix_core::engine::state::Connection;
use crate::vortix_core::journal::Journal;
use crate::vortix_core::ports::tunnel::Tunnel;
enum Envelope {
Input {
input: Input,
reply: oneshot::Sender<Result<CommandAck, EngineError>>,
},
Snapshot {
reply: oneshot::Sender<Snapshot>,
},
}
#[derive(Debug, Clone)]
pub struct CommandAck {
pub events_emitted: usize,
}
#[derive(Debug, Clone)]
pub struct Snapshot {
pub state: Connection,
pub journal_tail: Vec<EventEnvelope>,
}
pub struct EngineSubscription {
pub snapshot: Snapshot,
pub receiver: broadcast::Receiver<EventEnvelope>,
}
#[derive(Clone)]
pub struct LocalHandle {
command_tx: mpsc::Sender<Envelope>,
journal: Journal,
}
impl std::fmt::Debug for LocalHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LocalHandle")
.field("journal", &self.journal)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum EngineHandle {
Local(LocalHandle),
}
impl EngineHandle {
pub fn local<T: Tunnel + Send + 'static>(engine: Engine<T>, journal: Journal) -> Self {
let (tx, rx) = mpsc::channel::<Envelope>(64);
let journal_for_actor = journal.clone();
tokio::task::spawn_blocking(move || actor_loop(engine, journal_for_actor, rx));
Self::Local(LocalHandle {
command_tx: tx,
journal,
})
}
pub async fn execute(&self, input: Input) -> Result<CommandAck, EngineError> {
match self {
Self::Local(h) => h.execute(input).await,
}
}
pub async fn execute_command(&self, cmd: UserCommand) -> Result<CommandAck, EngineError> {
self.execute(Input::UserCommand(cmd)).await
}
pub async fn snapshot(&self) -> Result<Snapshot, EngineError> {
match self {
Self::Local(h) => h.snapshot().await,
}
}
pub async fn subscribe(&self) -> Result<EngineSubscription, EngineError> {
match self {
Self::Local(h) => h.subscribe().await,
}
}
#[must_use]
pub fn for_test() -> Self {
LocalHandle::for_test().into()
}
}
impl From<LocalHandle> for EngineHandle {
fn from(h: LocalHandle) -> Self {
Self::Local(h)
}
}
impl LocalHandle {
async fn execute(&self, input: Input) -> Result<CommandAck, EngineError> {
let (reply, rx) = oneshot::channel();
self.command_tx
.send(Envelope::Input { input, reply })
.await
.map_err(|_| EngineError::Other("engine actor terminated".into()))?;
rx.await
.map_err(|_| EngineError::Other("engine actor dropped reply".into()))?
}
async fn snapshot(&self) -> Result<Snapshot, EngineError> {
let (reply, rx) = oneshot::channel();
self.command_tx
.send(Envelope::Snapshot { reply })
.await
.map_err(|_| EngineError::Other("engine actor terminated".into()))?;
rx.await
.map_err(|_| EngineError::Other("engine actor dropped reply".into()))
}
async fn subscribe(&self) -> Result<EngineSubscription, EngineError> {
let snapshot = self.snapshot().await?;
Ok(EngineSubscription {
snapshot,
receiver: self.journal.subscribe(),
})
}
#[must_use]
pub fn for_test() -> Self {
use crate::vortix_core::ports::tunnel::mock::MockTunnel;
use crate::vortix_core::profile::{Profile, ProfileId, ProtocolKind};
use std::path::PathBuf;
let journal = Journal::open(crate::vortix_core::journal::JournalConfig {
disk: false,
..Default::default()
})
.expect("in-memory journal");
let engine = Engine::new(MockTunnel::new(), |id: &ProfileId| {
Some(Profile::new(
id.clone(),
id.as_str(),
ProtocolKind::WireGuard,
PathBuf::from(format!("/tmp/{}.conf", id.as_str())),
))
});
let (tx, rx) = mpsc::channel::<Envelope>(64);
let journal_for_actor = journal.clone();
tokio::task::spawn_blocking(move || actor_loop(engine, journal_for_actor, rx));
Self {
command_tx: tx,
journal,
}
}
}
#[allow(clippy::needless_pass_by_value)] fn actor_loop<T: Tunnel>(
mut engine: Engine<T>,
journal: Journal,
mut rx: mpsc::Receiver<Envelope>,
) {
while let Some(env) = rx.blocking_recv() {
match env {
Envelope::Input { input, reply } => {
let events = engine.handle(input);
let count = events.len();
let journal = Arc::new(journal.clone());
for ev in events {
let _ = journal.append(ev);
}
let _ = reply.send(Ok(CommandAck {
events_emitted: count,
}));
}
Envelope::Snapshot { reply } => {
let snapshot = Snapshot {
state: engine.state().clone(),
journal_tail: journal.tail(),
};
let _ = reply.send(snapshot);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::vortix_core::engine::input::UserCommand;
use crate::vortix_core::profile::ProfileId;
#[tokio::test]
async fn for_test_handles_connect() {
let handle = EngineHandle::for_test();
let ack = handle
.execute_command(UserCommand::Connect {
profile_id: ProfileId::new("corp"),
})
.await
.unwrap();
assert!(ack.events_emitted >= 2);
let snap = handle.snapshot().await.unwrap();
assert!(matches!(snap.state, Connection::Connected { .. }));
}
#[tokio::test]
async fn subscribe_returns_snapshot_plus_receiver() {
let handle = EngineHandle::for_test();
let sub = handle.subscribe().await.unwrap();
assert!(matches!(
sub.snapshot.state,
Connection::Disconnected { .. }
));
let _ = sub.receiver;
}
}