use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use tokio::time::Duration;
use mabi_core::Protocol;
use crate::device::{DeviceRegistry, DynDevicePort};
use crate::driver::{ProtocolDriverRegistry, ProtocolLaunchSpec};
use crate::service::{RuntimeError, RuntimeResult, ServiceHandle, ServiceSnapshot};
pub trait DevicePortLayer: Send + Sync {
fn decorate(&self, protocol: Option<Protocol>, port: DynDevicePort) -> DynDevicePort;
}
#[derive(Clone, Default)]
pub struct RuntimeExtensions {
device_layers: Vec<Arc<dyn DevicePortLayer>>,
protocol_configs: std::collections::BTreeMap<String, JsonValue>,
}
impl RuntimeExtensions {
pub fn new() -> Self {
Self::default()
}
pub fn add_device_layer(&mut self, layer: Arc<dyn DevicePortLayer>) {
self.device_layers.push(layer);
}
pub fn insert_protocol_config(&mut self, protocol: impl Into<String>, value: JsonValue) {
self.protocol_configs.insert(protocol.into(), value);
}
pub fn protocol_config(&self, protocol: &str) -> Option<&JsonValue> {
self.protocol_configs.get(protocol)
}
pub fn decorate_device_port(
&self,
protocol: Option<Protocol>,
mut port: DynDevicePort,
) -> DynDevicePort {
for layer in &self.device_layers {
port = layer.decorate(protocol, port);
}
port
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct RuntimeSessionSpec {
#[serde(default)]
pub services: Vec<ProtocolLaunchSpec>,
#[serde(default)]
pub readiness_timeout: Option<u64>,
}
impl RuntimeSessionSpec {
pub fn readiness_duration(&self, fallback: Duration) -> Duration {
self.readiness_timeout
.map(Duration::from_millis)
.unwrap_or(fallback)
}
}
pub struct RuntimeSession {
spec: RuntimeSessionSpec,
devices: DeviceRegistry,
handles: Vec<ServiceHandle>,
}
impl RuntimeSession {
pub async fn new(
spec: RuntimeSessionSpec,
registry: &ProtocolDriverRegistry,
extensions: RuntimeExtensions,
) -> RuntimeResult<Self> {
if spec.services.is_empty() {
return Err(RuntimeError::service(
"runtime session requires at least one service",
));
}
let devices = DeviceRegistry::new();
let mut handles = Vec::with_capacity(spec.services.len());
for launch in &spec.services {
let driver = registry.get(launch.key()).ok_or_else(|| {
RuntimeError::service(format!("unknown protocol driver: {}", launch.key()))
})?;
let descriptor = driver.descriptor();
let service = driver.build(launch.clone(), extensions.clone()).await?;
let service_protocol = service.status().protocol.or(Some(descriptor.protocol));
let service_devices = DeviceRegistry::new();
service.register_devices(&service_devices)?;
for (device_id, port) in service_devices.entries() {
devices.register(
device_id,
extensions.decorate_device_port(service_protocol, port),
);
}
handles.push(ServiceHandle::named(
launch.service_name(&descriptor),
service_protocol,
service,
));
}
Ok(Self {
spec,
devices,
handles,
})
}
pub async fn start(&self, fallback_readiness_timeout: Duration) -> RuntimeResult<()> {
let readiness_timeout = self.spec.readiness_duration(fallback_readiness_timeout);
let mut started = Vec::new();
for handle in &self.handles {
if let Err(error) = handle.spawn().await {
self.stop_started(&started).await;
return Err(error);
}
match handle.readiness(readiness_timeout).await {
Ok(status) if status.ready && !status.is_terminal() => started.push(handle),
Ok(status) => {
self.stop_started(&started).await;
return Err(RuntimeError::service(format!(
"service failed to become ready: {} ({:?})",
status.name, status.state
)));
}
Err(error) => {
self.stop_started(&started).await;
return Err(error);
}
}
}
Ok(())
}
async fn stop_started(&self, started: &[&ServiceHandle]) {
for handle in started.iter().rev() {
let _ = handle.stop().await;
}
}
pub async fn stop(&self) -> RuntimeResult<()> {
let mut errors = Vec::new();
for handle in self.handles.iter().rev() {
if let Err(error) = handle.stop().await {
errors.push(error.to_string());
}
}
if errors.is_empty() {
Ok(())
} else {
Err(RuntimeError::service(errors.join("; ")))
}
}
pub fn devices(&self) -> DeviceRegistry {
self.devices.clone()
}
pub async fn snapshots(&self) -> RuntimeResult<Vec<ServiceSnapshot>> {
let mut snapshots = Vec::with_capacity(self.handles.len());
for handle in &self.handles {
snapshots.push(handle.snapshot().await?);
}
Ok(snapshots)
}
pub fn handles(&self) -> &[ServiceHandle] {
&self.handles
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use async_trait::async_trait;
use tokio::time::Duration;
use mabi_core::Protocol;
use crate::device::DeviceRegistry;
use crate::driver::{
ProtocolDescriptor, ProtocolDriver, ProtocolDriverRegistry, ProtocolLaunchSpec,
};
use crate::service::{
ManagedService, RuntimeResult, ServiceContext, ServiceSnapshot, ServiceState, ServiceStatus,
};
use crate::session::{RuntimeExtensions, RuntimeSession, RuntimeSessionSpec};
struct NullService {
status: parking_lot::RwLock<ServiceStatus>,
}
impl NullService {
fn new() -> Self {
Self {
status: parking_lot::RwLock::new(ServiceStatus::new("null")),
}
}
}
#[async_trait]
impl ManagedService for NullService {
async fn start(&self, _context: &ServiceContext) -> RuntimeResult<()> {
let mut status = self.status.write();
status.state = ServiceState::Starting;
Ok(())
}
async fn stop(&self, _context: &ServiceContext) -> RuntimeResult<()> {
let mut status = self.status.write();
status.state = ServiceState::Stopped;
status.ready = false;
Ok(())
}
async fn serve(&self, context: ServiceContext) -> RuntimeResult<()> {
{
let mut status = self.status.write();
status.state = ServiceState::Running;
status.ready = true;
}
context.cancellation_token().cancelled().await;
let mut status = self.status.write();
status.state = ServiceState::Stopped;
status.ready = false;
Ok(())
}
fn status(&self) -> ServiceStatus {
self.status.read().clone()
}
async fn snapshot(&self) -> RuntimeResult<ServiceSnapshot> {
let mut snapshot = ServiceSnapshot::new("null");
snapshot.status = self.status();
Ok(snapshot)
}
fn register_devices(&self, _registry: &DeviceRegistry) -> RuntimeResult<()> {
Ok(())
}
}
struct NullDriver;
#[async_trait]
impl ProtocolDriver for NullDriver {
fn descriptor(&self) -> ProtocolDescriptor {
ProtocolDescriptor {
key: "null",
display_name: "Null",
protocol: Protocol::ModbusTcp,
default_port: 0,
description: "null driver",
}
}
async fn build(
&self,
_spec: ProtocolLaunchSpec,
_extensions: RuntimeExtensions,
) -> RuntimeResult<Arc<dyn ManagedService>> {
Ok(Arc::new(NullService::new()))
}
}
#[tokio::test]
async fn session_starts_and_stops_services() {
let mut registry = ProtocolDriverRegistry::new();
registry.register(NullDriver);
let spec = RuntimeSessionSpec {
services: vec![ProtocolLaunchSpec {
protocol: "null".into(),
name: Some("test-null".into()),
config: serde_json::json!({}),
}],
readiness_timeout: Some(1_000),
};
let session = RuntimeSession::new(spec, ®istry, RuntimeExtensions::default())
.await
.unwrap();
session.start(Duration::from_secs(1)).await.unwrap();
assert_eq!(session.snapshots().await.unwrap().len(), 1);
session.stop().await.unwrap();
}
}