#![cfg(all(feature = "service", feature = "introspection", feature = "idl-parse"))]
use std::{pin::pin, time::Duration};
use futures_util::{pin_mut, stream::StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::{select, time::sleep};
use zlink::{
introspect::{self, CustomType, ReplyError as _, Type},
notified::{self, traits::State as _},
unix::{bind, connect},
varlink_service::{self, Proxy as _},
};
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn ftl() -> Result<(), Box<dyn std::error::Error>> {
if let Err(e) = tokio::fs::remove_file(SOCKET_PATH).await {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e.into());
}
}
let conditions = [
DriveCondition {
state: DriveState::Idle,
tylium_level: 100,
},
DriveCondition {
state: DriveState::Spooling,
tylium_level: 90,
},
DriveCondition {
state: DriveState::Spooling,
tylium_level: 90,
},
];
let listener = bind(SOCKET_PATH).unwrap();
let service = Ftl::new(conditions[0]);
let server = zlink::Server::new(listener, service);
select! {
res = server.run() => res?,
res = run_client(&conditions) => res?,
}
Ok(())
}
async fn run_client(conditions: &[DriveCondition]) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = connect(SOCKET_PATH).await?;
let mut drive_monitor_stream = pin!(conn.get_drive_condition().await?);
{
let mut conn = connect(SOCKET_PATH).await?;
let info = conn.get_info().await?.map_err(|e| e.to_string())?;
assert_eq!(info.vendor, VENDOR);
assert_eq!(info.product, PRODUCT);
assert_eq!(info.version, VERSION);
assert_eq!(info.url, URL);
assert_eq!(info.interfaces, INTERFACES);
let interface = conn
.get_interface_description("org.varlink.service")
.await?
.map_err(|e| e.to_string())?;
let interface = interface.parse().unwrap();
assert_eq!(&interface, varlink_service::DESCRIPTION);
let interface = conn
.get_interface_description("org.example.ftl")
.await?
.map_err(|e| e.to_string())?;
let interface = interface.parse().unwrap();
assert_eq!(interface.name(), "org.example.ftl");
let method_names: Vec<_> = interface.methods().map(|m| m.name()).collect();
assert!(method_names.contains(&"GetDriveCondition"));
assert!(method_names.contains(&"SetDriveCondition"));
assert!(method_names.contains(&"Jump"));
assert!(method_names.contains(&"Locate"));
assert!(method_names.contains(&"GetCoordinates"));
assert!(method_names.contains(&"ResetCoordinates"));
let error = conn
.get_interface_description("org.varlink.unimplemented")
.await
.unwrap_err();
let zlink::Error::VarlinkService(owned_error) = error else {
panic!("Expected VarlinkService error");
};
assert!(matches!(
owned_error.inner(),
varlink_service::Error::InterfaceNotFound { .. }
));
let target = "Alpha Centauri";
let location = conn.locate(target).await.unwrap()?;
assert_eq!(location.name, target);
let replies = conn
.chain_set_drive_condition(conditions[1])?
.set_drive_condition(conditions[2])?
.send::<OwnedFtlReply, FtlError>()
.await?;
{
pin_mut!(replies);
let (reply, _fds) = replies.next().await.unwrap()?;
let reply = reply.unwrap();
let Some(OwnedFtlReply::DriveCondition(drive_condition)) = reply.into_parameters()
else {
panic!("Unexpected reply");
};
assert_eq!(drive_condition, conditions[1]);
let (reply, _fds) = replies.next().await.unwrap()?;
let reply = reply.unwrap();
let Some(OwnedFtlReply::DriveCondition(drive_condition)) = reply.into_parameters()
else {
panic!("Unexpected reply");
};
assert_eq!(drive_condition, conditions[2]);
assert!(replies.next().await.is_none());
}
{
let duration = 10;
let impossible_speed = conditions[1].tylium_level / duration + 1;
let replies = conn
.chain_jump(DriveConfiguration {
speed: impossible_speed,
trajectory: 1,
duration,
})?
.jump(DriveConfiguration {
speed: impossible_speed - 1,
trajectory: 1,
duration,
})?
.send::<OwnedFtlReply, FtlError>()
.await?;
pin_mut!(replies);
let (result, _fds) = replies.try_next().await?.unwrap();
let e = result.unwrap_err();
assert_eq!(e, FtlError::NotEnoughEnergy);
let (reply, _fds) = replies.try_next().await?.unwrap();
let reply = reply?;
assert_eq!(
reply.parameters(),
Some(&OwnedFtlReply::Coordinates(Coordinate {
longitude: 1.0,
latitude: 0.0,
distance: 10,
}))
);
}
{
let replies = conn
.chain_get_coordinates()?
.reset_coordinates()?
.reset_coordinates()?
.get_coordinates()?
.send::<OwnedFtlReply, FtlError>()
.await?;
pin_mut!(replies);
let (reply, _fds) = replies.next().await.unwrap()?;
let reply = reply.unwrap();
let Some(OwnedFtlReply::Coordinates(coords)) = reply.into_parameters() else {
panic!("Expected Coordinates reply");
};
assert_eq!(
coords,
Coordinate {
longitude: 1.0,
latitude: 0.0,
distance: 10,
}
);
let (reply, _fds) = replies.next().await.unwrap()?;
let reply = reply.unwrap();
let Some(OwnedFtlReply::Coordinates(coords)) = reply.into_parameters() else {
panic!("Expected Coordinates reply");
};
assert_eq!(
coords,
Coordinate {
longitude: 0.0,
latitude: 0.0,
distance: 0,
}
);
assert!(replies.next().await.is_none());
}
}
let drive_cond = drive_monitor_stream.try_next().await?.unwrap()?;
let OwnedFtlReply::DriveCondition(condition) = drive_cond else {
panic!("Expected DriveCondition reply");
};
assert_eq!(condition, conditions[1]);
Ok(())
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
struct OwnedLocation {
name: String,
coordinates: Coordinate,
}
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(untagged)]
enum OwnedFtlReply {
DriveCondition(DriveCondition),
Coordinates(Coordinate),
Location(OwnedLocation),
}
#[zlink::proxy("org.example.ftl")]
trait FtlProxy {
#[zlink(more)]
async fn get_drive_condition(
&mut self,
) -> zlink::Result<
impl futures_util::Stream<Item = zlink::Result<Result<OwnedFtlReply, FtlError>>>,
>;
async fn locate(&mut self, target: &str) -> zlink::Result<Result<Location, FtlError>>;
async fn set_drive_condition(
&mut self,
condition: DriveCondition,
) -> zlink::Result<Result<OwnedFtlReply, FtlError>>;
async fn jump(
&mut self,
config: DriveConfiguration,
) -> zlink::Result<Result<OwnedFtlReply, FtlError>>;
async fn get_coordinates(&mut self) -> zlink::Result<Result<OwnedFtlReply, FtlError>>;
#[zlink(oneway)]
async fn reset_coordinates(&mut self) -> zlink::Result<()>;
}
struct Ftl {
drive_condition: notified::State<DriveCondition, DriveCondition>,
coordinates: Coordinate,
}
impl Ftl {
fn new(init_conditions: DriveCondition) -> Self {
Self {
drive_condition: notified::State::new(init_conditions),
coordinates: Coordinate {
longitude: 0.0,
latitude: 0.0,
distance: 0,
},
}
}
}
#[zlink::service(
interface = "org.example.ftl",
vendor = "The FL project",
product = "FTL-capable Spaceship \u{1F680}",
version = "1",
url = "https://want.ftl.now/",
types = [DriveCondition, DriveConfiguration, Coordinate, Location]
)]
impl Ftl {
#[zlink(more)]
async fn get_drive_condition(&self, more: bool) -> notified::Stream<DriveCondition> {
if more {
self.drive_condition.stream()
} else {
self.drive_condition.stream_once()
}
}
async fn set_drive_condition(&mut self, condition: DriveCondition) -> DriveCondition {
self.drive_condition.set(condition).await;
self.drive_condition.get()
}
async fn get_coordinates(&self) -> Coordinate {
self.coordinates
}
async fn jump(&mut self, config: DriveConfiguration) -> Result<Coordinate, FtlError> {
let condition = self.drive_condition.get();
let tylium_required = config.speed * config.duration;
if tylium_required > condition.tylium_level {
return Err(FtlError::NotEnoughEnergy);
}
let current_coords = self.coordinates;
sleep(Duration::from_millis(1)).await;
let coords = Coordinate {
longitude: current_coords.longitude + config.trajectory as f32,
latitude: current_coords.latitude,
distance: current_coords.distance + config.duration,
};
let new_condition = DriveCondition {
state: DriveState::Idle,
tylium_level: condition.tylium_level - tylium_required,
};
self.drive_condition.set(new_condition).await;
self.coordinates = coords;
Ok(coords)
}
async fn locate(&self, target: String) -> Location {
let coordinates = Coordinate {
longitude: target.len() as f32 * 1.1,
latitude: target.len() as f32 * 2.2,
distance: target.len() as i64 * 10,
};
Location {
name: target,
coordinates,
}
}
async fn reset_coordinates(&mut self) {
self.coordinates = Coordinate {
longitude: 0.0,
latitude: 0.0,
distance: 0,
};
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, CustomType)]
struct DriveCondition {
state: DriveState,
tylium_level: i64,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Type)]
#[serde(rename_all = "snake_case")]
pub enum DriveState {
Idle,
Spooling,
Busy,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, CustomType)]
struct DriveConfiguration {
speed: i64,
trajectory: i64,
duration: i64,
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, CustomType)]
struct Coordinate {
longitude: f32,
latitude: f32,
distance: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, CustomType)]
struct Location {
name: String,
coordinates: Coordinate,
}
#[derive(Debug, Clone, PartialEq, zlink::ReplyError, introspect::ReplyError)]
#[zlink(interface = "org.example.ftl")]
enum FtlError {
NotEnoughEnergy,
ParameterOutOfRange,
InvalidCoordinates {
latitude: f32,
longitude: f32,
reason: String,
},
SystemOverheat {
temperature: i32,
},
}
impl core::fmt::Display for FtlError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FtlError::NotEnoughEnergy => write!(f, "Not enough energy"),
FtlError::ParameterOutOfRange => write!(f, "Parameter out of range"),
FtlError::InvalidCoordinates {
latitude,
longitude,
reason,
} => {
write!(
f,
"Invalid coordinates ({}, {}): {}",
latitude, longitude, reason
)
}
FtlError::SystemOverheat { temperature } => {
write!(f, "System overheating at {} degrees", temperature)
}
}
}
}
impl std::error::Error for FtlError {}
#[test_log::test(tokio::test)]
async fn reply_error_derive_works() {
assert_eq!(FtlError::VARIANTS.len(), 4);
assert_eq!(FtlError::VARIANTS[0].name(), "NotEnoughEnergy");
assert!(FtlError::VARIANTS[0].has_no_fields());
assert_eq!(FtlError::VARIANTS[1].name(), "ParameterOutOfRange");
assert!(FtlError::VARIANTS[1].has_no_fields());
assert_eq!(FtlError::VARIANTS[2].name(), "InvalidCoordinates");
assert!(!FtlError::VARIANTS[2].has_no_fields());
let fields: Vec<_> = FtlError::VARIANTS[2].fields().collect();
assert_eq!(fields.len(), 3);
assert_eq!(fields[0].name(), "latitude");
assert_eq!(fields[1].name(), "longitude");
assert_eq!(fields[2].name(), "reason");
assert_eq!(FtlError::VARIANTS[3].name(), "SystemOverheat");
assert!(!FtlError::VARIANTS[3].has_no_fields());
let fields: Vec<_> = FtlError::VARIANTS[3].fields().collect();
assert_eq!(fields.len(), 1);
assert_eq!(fields[0].name(), "temperature");
}
const SOCKET_PATH: &str = "/tmp/zlink-ftl.sock";
const VENDOR: &str = "The FL project";
const PRODUCT: &str = "FTL-capable Spaceship \u{1F680}";
const VERSION: &str = "1";
const URL: &str = "https://want.ftl.now/";
const INTERFACES: [&str; 2] = ["org.example.ftl", "org.varlink.service"];