use std::sync::{Mutex, mpsc};
use std::time::{Duration, Instant};
use liminal::channel::SchemaId;
use liminal::conversation::{ConversationActor, ConversationPhase, ParticipantPid};
use liminal::envelope::{Envelope, PublisherId};
use liminal::protocol::{
CausalContext as ProtocolCausalContext, MessageEnvelope, SchemaId as ProtocolSchemaId,
};
use crate::ServerError;
pub trait ConversationResource: std::fmt::Debug + Send {
fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError>;
fn participant_pids(&self) -> Vec<u64>;
fn has_detected_crash(&self) -> bool;
fn await_crash(&self, timeout: Duration) -> Option<Instant>;
fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError>;
fn close(self: Box<Self>) -> Result<(), ServerError>;
}
#[derive(Debug)]
pub struct ConnectionConversation {
resource: Box<dyn ConversationResource>,
}
impl ConnectionConversation {
#[must_use]
pub fn new(resource: Box<dyn ConversationResource>) -> Self {
Self { resource }
}
pub(super) fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
self.resource.message(envelope)
}
#[must_use]
pub fn participant_pids(&self) -> Vec<u64> {
self.resource.participant_pids()
}
#[must_use]
pub fn has_detected_crash(&self) -> bool {
self.resource.has_detected_crash()
}
#[must_use]
pub fn await_crash(&self, timeout: Duration) -> Option<Instant> {
self.resource.await_crash(timeout)
}
pub fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
self.resource.receive_reply(timeout)
}
pub(super) fn close(self) -> Result<(), ServerError> {
self.resource.close()
}
}
#[derive(Debug)]
pub(super) struct LiminalConversationResource {
actor: ConversationActor,
participant: ParticipantPid,
exit_rx: Mutex<mpsc::Receiver<Instant>>,
crash_observed: Mutex<Option<Instant>>,
}
impl LiminalConversationResource {
pub(super) const fn new(
actor: ConversationActor,
participant: ParticipantPid,
exit_rx: mpsc::Receiver<Instant>,
) -> Self {
Self {
actor,
participant,
exit_rx: Mutex::new(exit_rx),
crash_observed: Mutex::new(None),
}
}
fn poll_exit_signal(&self) -> Option<Instant> {
if let Ok(cached) = self.crash_observed.lock() {
if let Some(instant) = *cached {
return Some(instant);
}
}
let received = self.exit_rx.lock().map_or(None, |rx| rx.try_recv().ok());
self.cache(received);
received
}
fn cache(&self, instant: Option<Instant>) {
if let Some(instant) = instant {
if let Ok(mut cached) = self.crash_observed.lock() {
*cached = Some(instant);
}
}
}
fn actor_phase_failed(&self) -> bool {
matches!(
self.actor.state().map(|state| state.current_phase),
Ok(ConversationPhase::Failed)
)
}
}
impl ConversationResource for LiminalConversationResource {
fn message(&self, envelope: &MessageEnvelope) -> Result<(), ServerError> {
if self.poll_exit_signal().is_some() || self.actor_phase_failed() {
return Err(ServerError::ListenerAccept {
message: format!(
"conversation participant {} crashed; message rejected",
self.participant.get()
),
});
}
let payload = envelope.payload.clone();
let message = Envelope::new(payload, None, SchemaId::new(), PublisherId::default());
self.actor
.handle()
.send(message)
.map_err(|error| ServerError::ListenerAccept {
message: format!("conversation message delivery failed: {error}"),
})
}
fn participant_pids(&self) -> Vec<u64> {
vec![self.participant.get()]
}
fn has_detected_crash(&self) -> bool {
self.poll_exit_signal().is_some() || self.actor_phase_failed()
}
fn await_crash(&self, timeout: Duration) -> Option<Instant> {
if let Some(instant) = self.poll_exit_signal() {
return Some(instant);
}
let received = self
.exit_rx
.lock()
.map_or(None, |rx| rx.recv_timeout(timeout).ok());
self.cache(received);
received
}
fn receive_reply(&self, timeout: Duration) -> Result<MessageEnvelope, ServerError> {
let reply =
self.actor
.receive_timeout(timeout)
.map_err(|error| ServerError::ListenerAccept {
message: format!("conversation reply receive failed: {error}"),
})?;
Ok(MessageEnvelope::new(
ProtocolSchemaId::new([0; ProtocolSchemaId::WIRE_LEN]),
ProtocolCausalContext::independent(),
reply.payload,
))
}
fn close(self: Box<Self>) -> Result<(), ServerError> {
let Self { actor, .. } = *self;
if matches!(
actor.state().map(|state| state.current_phase),
Ok(ConversationPhase::Failed)
) {
actor.handle().close().ok();
return Ok(());
}
actor
.handle()
.close()
.map_err(|error| ServerError::ListenerAccept {
message: format!("conversation close failed: {error}"),
})
}
}