use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use facet::Facet;
use vox_types::{
Conduit, ConduitRx, ConnectionRole, ConnectionSettings, Handler, HandshakeResult, Message,
MessageFamily, MessagePayload, Parity, Payload, ReplySink, RequestCall, RequestResponse,
SelfRef, Tx,
};
use crate::{BareConduit, DriverReplySink, memory_link_pair};
#[derive(Clone)]
pub(crate) struct TestLaneClient {
pub(crate) caller: crate::Caller,
pub(crate) connection: Option<crate::connection::ConnectionHandle>,
}
impl crate::FromVoxLane for TestLaneClient {
const SERVICE_NAME: &'static str = "Noop";
fn from_vox_lane(
caller: crate::Caller,
connection: Option<crate::connection::ConnectionHandle>,
) -> Self {
Self { caller, connection }
}
}
pub(crate) fn test_acceptor_handshake() -> HandshakeResult {
HandshakeResult {
role: ConnectionRole::Acceptor,
our_settings: ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
peer_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
our_schema: vec![],
peer_schema: vec![],
peer_metadata: vox_types::metadata().str("vox-service", "Noop").build(),
peer_evidence: vox_types::PeerEvidence::none(),
peer_identity: vox_types::PeerIdentity::anonymous(),
}
}
pub(crate) fn test_initiator_handshake() -> HandshakeResult {
HandshakeResult {
role: ConnectionRole::Initiator,
our_settings: ConnectionSettings {
parity: Parity::Odd,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
peer_settings: ConnectionSettings {
parity: Parity::Even,
max_concurrent_requests: 64,
initial_channel_credit: 16,
},
our_schema: vec![],
peer_schema: vec![],
peer_metadata: vox_types::metadata().str("vox-service", "Noop").build(),
peer_evidence: vox_types::PeerEvidence::none(),
peer_identity: vox_types::PeerIdentity::anonymous(),
}
}
pub(crate) type MessageConduit = BareConduit<MessageFamily, crate::MemoryLink>;
pub(crate) fn message_conduit_pair() -> (MessageConduit, MessageConduit) {
let (a, b) = memory_link_pair(64);
(BareConduit::new(a), BareConduit::new(b))
}
pub(crate) struct DropPongConduit<C> {
inner: C,
}
impl<C> DropPongConduit<C> {
pub(crate) fn new(inner: C) -> Self {
Self { inner }
}
}
impl<C> Conduit for DropPongConduit<C>
where
C: Conduit<Msg = MessageFamily>,
C::Rx: Send,
{
type Msg = MessageFamily;
type Tx = C::Tx;
type Rx = DropPongRx<C::Rx>;
fn split(self) -> (Self::Tx, Self::Rx) {
let (tx, rx) = self.inner.split();
(tx, DropPongRx { inner: rx })
}
}
impl<C> crate::IntoConduit for DropPongConduit<C>
where
C: Conduit<Msg = MessageFamily>,
C::Rx: Send,
{
type Conduit = Self;
fn into_conduit(self) -> Self {
self
}
}
pub(crate) struct DropPongRx<Rx> {
inner: Rx,
}
impl<Rx> ConduitRx for DropPongRx<Rx>
where
Rx: ConduitRx<Msg = MessageFamily> + Send,
{
type Msg = MessageFamily;
type Error = Rx::Error;
async fn recv(&mut self) -> Result<Option<SelfRef<Message<'static>>>, Self::Error> {
loop {
let Some(msg) = self.inner.recv().await? else {
return Ok(None);
};
let msg_ref = msg.get();
if matches!(&msg_ref.payload, MessagePayload::Pong(_)) {
continue;
}
return Ok(Some(msg));
}
}
}
#[derive(Clone, Copy)]
pub(crate) struct EchoHandler;
impl Handler<DriverReplySink> for EchoHandler {
async fn handle(
&self,
call: SelfRef<RequestCall<'static>>,
reply: DriverReplySink,
_schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
) {
let call = call.get();
let args_bytes = match &call.args {
Payload::Encoded(bytes) => bytes,
_ => panic!("expected incoming payload"),
};
let result: u32 = vox_phon::from_slice(args_bytes).expect("deserialize args");
reply
.send_reply(RequestResponse {
ret: Payload::outgoing(&result),
schemas: Default::default(),
metadata: Default::default(),
})
.await;
}
}
#[derive(Clone)]
pub(crate) struct BlockingHandler {
pub(crate) was_cancelled: Arc<AtomicBool>,
}
impl Handler<DriverReplySink> for BlockingHandler {
async fn handle(
&self,
_call: SelfRef<RequestCall<'static>>,
reply: DriverReplySink,
_schemas: std::sync::Arc<vox_types::SchemaRecvTracker>,
) {
let was_cancelled = self.was_cancelled.clone();
let _reply = reply;
struct DropGuard(Arc<AtomicBool>);
impl Drop for DropGuard {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
let _guard = DropGuard(was_cancelled);
std::future::pending::<()>().await;
}
}
use crate::connection::{LaneAcceptor, LaneRejection, LaneRequest, PendingLane};
pub(crate) struct EchoAcceptor;
impl LaneAcceptor for EchoAcceptor {
fn accept(&self, _request: &LaneRequest, connection: PendingLane) -> Result<(), LaneRejection> {
connection.handle_with(EchoHandler);
Ok(())
}
}
#[derive(Facet)]
pub(crate) struct SubscribeArgs {
pub(crate) updates: Tx<u32>,
}