use std::{
collections::VecDeque,
fmt,
sync::{
Arc, Condvar, Mutex, MutexGuard, Weak,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};
use futures::channel::oneshot;
#[cfg(feature = "cluster")]
use ractor::{ActorCell, BytesConvertable};
#[cfg(feature = "cluster")]
use std::{
sync::atomic::AtomicU64,
time::{SystemTime, UNIX_EPOCH},
};
use crate::stream::{
BoxStream, Cancellable, NotUsed, Sink, Source, StreamCancellation, StreamCompletion,
StreamError, StreamResult,
};
use super::{Actor, ActorProcessingErr, ActorRef, ActorResult, Message, block_on_ractor_runtime};
const DEFAULT_STREAM_REF_BUFFER_CAPACITY: usize = 32;
const DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_STREAM_REF_DEMAND_REDELIVERY: Duration = Duration::from_secs(1);
const STREAM_REF_WAIT_POLL: Duration = Duration::from_millis(1);
#[cfg(feature = "cluster")]
const STREAM_REF_REMOTE_SCOPE: &str = "datum.streamrefs";
#[cfg(feature = "cluster")]
static STREAM_REF_REMOTE_ID: AtomicU64 = AtomicU64::new(1);
#[cfg(feature = "cluster")]
mod stream_ref_actor_bound {
pub trait Bound: ractor::BytesConvertable {}
impl<T> Bound for T where T: ractor::BytesConvertable {}
}
#[cfg(not(feature = "cluster"))]
mod stream_ref_actor_bound {
pub trait Bound {}
impl<T> Bound for T {}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamRefSettings {
buffer_capacity: usize,
subscription_timeout: Duration,
demand_redelivery_interval: Duration,
}
impl Default for StreamRefSettings {
fn default() -> Self {
Self {
buffer_capacity: DEFAULT_STREAM_REF_BUFFER_CAPACITY,
subscription_timeout: DEFAULT_STREAM_REF_SUBSCRIPTION_TIMEOUT,
demand_redelivery_interval: DEFAULT_STREAM_REF_DEMAND_REDELIVERY,
}
}
}
impl StreamRefSettings {
#[must_use]
pub fn buffer_capacity(&self) -> usize {
self.buffer_capacity
}
#[must_use]
pub fn subscription_timeout(&self) -> Duration {
self.subscription_timeout
}
#[must_use]
pub fn demand_redelivery_interval(&self) -> Duration {
self.demand_redelivery_interval
}
#[must_use]
pub fn with_buffer_capacity(mut self, capacity: usize) -> Self {
assert!(
capacity > 0,
"StreamRef buffer capacity must be greater than zero"
);
self.buffer_capacity = capacity;
self
}
#[must_use]
pub fn with_subscription_timeout(mut self, timeout: Duration) -> Self {
self.subscription_timeout = timeout;
self
}
#[must_use]
pub fn with_demand_redelivery_interval(mut self, interval: Duration) -> Self {
self.demand_redelivery_interval = interval;
self
}
}
#[cfg(feature = "cluster")]
#[derive(Debug, Clone, PartialEq, Eq)]
struct RemoteStreamRefEndpoint {
scope: String,
group: String,
}
#[cfg(feature = "cluster")]
impl RemoteStreamRefEndpoint {
fn new(kind: &str) -> Self {
let sequence = STREAM_REF_REMOTE_ID.fetch_add(1, Ordering::Relaxed);
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or_default();
Self {
scope: STREAM_REF_REMOTE_SCOPE.to_owned(),
group: format!(
"datum-streamref-{kind}-{}-{sequence}-{timestamp}",
std::process::id()
),
}
}
}
#[cfg(feature = "cluster")]
#[derive(Debug, Clone)]
enum RemoteRefEnvelope {
Ready {
endpoint: RemoteStreamRefEndpoint,
settings: StreamRefSettings,
},
Failed(StreamError),
}
#[cfg(feature = "cluster")]
enum RemoteSourceRefState {
Ready(RemoteStreamRefEndpoint),
Failed(StreamError),
}
#[cfg(feature = "cluster")]
enum RemoteSinkRefState {
Ready(RemoteStreamRefEndpoint),
Failed(StreamError),
}
#[cfg(feature = "cluster")]
impl BytesConvertable for RemoteStreamRefEndpoint {
fn into_bytes(self) -> Vec<u8> {
let mut bytes = Vec::new();
put_string(&mut bytes, self.scope);
put_string(&mut bytes, self.group);
bytes
}
fn from_bytes(bytes: Vec<u8>) -> Self {
let mut cursor = 0;
let scope = take_string(&bytes, &mut cursor);
let group = take_string(&bytes, &mut cursor);
Self { scope, group }
}
}
#[cfg(feature = "cluster")]
impl BytesConvertable for StreamRefSettings {
fn into_bytes(self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(40);
put_u64(&mut bytes, self.buffer_capacity as u64);
put_duration(&mut bytes, self.subscription_timeout);
put_duration(&mut bytes, self.demand_redelivery_interval);
bytes
}
fn from_bytes(bytes: Vec<u8>) -> Self {
let mut cursor = 0;
let buffer_capacity = take_u64(&bytes, &mut cursor) as usize;
let subscription_timeout = take_duration(&bytes, &mut cursor);
let demand_redelivery_interval = take_duration(&bytes, &mut cursor);
Self {
buffer_capacity,
subscription_timeout,
demand_redelivery_interval,
}
}
}
#[cfg(feature = "cluster")]
impl BytesConvertable for StreamError {
fn into_bytes(self) -> Vec<u8> {
let mut bytes = Vec::new();
match self {
StreamError::Cancelled => put_u8(&mut bytes, 0),
StreamError::AbruptTermination => put_u8(&mut bytes, 1),
StreamError::Backpressured => put_u8(&mut bytes, 2),
StreamError::EmptyStream => put_u8(&mut bytes, 3),
StreamError::MaybeIncomplete => put_u8(&mut bytes, 4),
StreamError::LimitExceeded { max } => {
put_u8(&mut bytes, 5);
put_u64(&mut bytes, max);
}
StreamError::InvalidPortOperation {
operation,
port,
reason,
} => {
put_u8(&mut bytes, 6);
put_string(&mut bytes, operation.to_owned());
put_string(&mut bytes, port);
put_string(&mut bytes, reason);
}
StreamError::GraphValidation(message) => {
put_u8(&mut bytes, 7);
put_string(&mut bytes, message);
}
StreamError::EventLimitExceeded { limit } => {
put_u8(&mut bytes, 8);
put_u64(&mut bytes, limit as u64);
}
StreamError::ActorAskTimeout { timeout } => {
put_u8(&mut bytes, 9);
put_duration(&mut bytes, timeout);
}
StreamError::ActorTerminated => put_u8(&mut bytes, 10),
StreamError::ActorAskResponseDropped => put_u8(&mut bytes, 11),
StreamError::ActorAskSendFailed { reason } => {
put_u8(&mut bytes, 12);
put_string(&mut bytes, reason);
}
StreamError::ActorAskTaskFailed { reason } => {
put_u8(&mut bytes, 13);
put_string(&mut bytes, reason);
}
StreamError::Failed(message) => {
put_u8(&mut bytes, 14);
put_string(&mut bytes, message);
}
}
bytes
}
fn from_bytes(bytes: Vec<u8>) -> Self {
let mut cursor = 0;
match take_u8(&bytes, &mut cursor) {
0 => StreamError::Cancelled,
1 => StreamError::AbruptTermination,
2 => StreamError::Backpressured,
3 => StreamError::EmptyStream,
4 => StreamError::MaybeIncomplete,
5 => StreamError::LimitExceeded {
max: take_u64(&bytes, &mut cursor),
},
6 => StreamError::InvalidPortOperation {
operation: remote_port_operation(take_string(&bytes, &mut cursor)),
port: take_string(&bytes, &mut cursor),
reason: take_string(&bytes, &mut cursor),
},
7 => StreamError::GraphValidation(take_string(&bytes, &mut cursor)),
8 => StreamError::EventLimitExceeded {
limit: take_u64(&bytes, &mut cursor) as usize,
},
9 => StreamError::ActorAskTimeout {
timeout: take_duration(&bytes, &mut cursor),
},
10 => StreamError::ActorTerminated,
11 => StreamError::ActorAskResponseDropped,
12 => StreamError::ActorAskSendFailed {
reason: take_string(&bytes, &mut cursor),
},
13 => StreamError::ActorAskTaskFailed {
reason: take_string(&bytes, &mut cursor),
},
14 => StreamError::Failed(take_string(&bytes, &mut cursor)),
_ => StreamError::Failed("invalid remote stream error encoding".to_owned()),
}
}
}
#[cfg(feature = "cluster")]
impl BytesConvertable for RemoteRefEnvelope {
fn into_bytes(self) -> Vec<u8> {
let mut bytes = Vec::new();
match self {
Self::Ready { endpoint, settings } => {
put_u8(&mut bytes, 0);
put_bytes(&mut bytes, endpoint.into_bytes());
put_bytes(&mut bytes, settings.into_bytes());
}
Self::Failed(error) => {
put_u8(&mut bytes, 1);
put_bytes(&mut bytes, error.into_bytes());
}
}
bytes
}
fn from_bytes(bytes: Vec<u8>) -> Self {
let mut cursor = 0;
match take_u8(&bytes, &mut cursor) {
0 => {
let endpoint = RemoteStreamRefEndpoint::from_bytes(take_bytes(&bytes, &mut cursor));
let settings = StreamRefSettings::from_bytes(take_bytes(&bytes, &mut cursor));
Self::Ready { endpoint, settings }
}
1 => Self::Failed(StreamError::from_bytes(take_bytes(&bytes, &mut cursor))),
_ => Self::Failed(StreamError::Failed(
"invalid remote stream ref encoding".to_owned(),
)),
}
}
}
pub struct StreamRefs;
impl StreamRefs {
#[must_use]
pub fn source_ref<T>() -> Sink<T, SourceRef<T>>
where
T: Send + 'static,
{
Self::source_ref_with_settings(StreamRefSettings::default())
}
#[must_use]
pub fn source_ref_with_settings<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
where
T: Send + 'static,
{
stream_ref_source_sink(settings)
}
#[must_use]
pub fn sink_ref<T>() -> Source<T, SinkRef<T>>
where
T: Send + 'static,
{
Self::sink_ref_with_settings(StreamRefSettings::default())
}
#[must_use]
pub fn sink_ref_with_settings<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
where
T: Send + 'static,
{
stream_ref_sink_source(settings)
}
}
pub struct SourceRef<T> {
inner: Arc<SourceRefInner<T>>,
}
struct SourceRefInner<T> {
#[allow(dead_code)]
producer: LazyProducerStatus<T>,
direct: Arc<SourceRefDirectState<T>>,
settings: StreamRefSettings,
subscribed: AtomicBool,
timeout: Option<Cancellable>,
#[cfg(feature = "cluster")]
remote: Option<RemoteSourceRefState>,
}
impl<T> Clone for SourceRef<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> fmt::Debug for SourceRef<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SourceRef").finish_non_exhaustive()
}
}
#[cfg(feature = "cluster")]
impl<T> BytesConvertable for SourceRef<T>
where
T: BytesConvertable + Send + 'static,
{
fn into_bytes(self) -> Vec<u8> {
match self.activate_remote_producer() {
Ok(endpoint) => RemoteRefEnvelope::Ready {
endpoint,
settings: self.inner.settings,
},
Err(error) => RemoteRefEnvelope::Failed(error),
}
.into_bytes()
}
fn from_bytes(bytes: Vec<u8>) -> Self {
match RemoteRefEnvelope::from_bytes(bytes) {
RemoteRefEnvelope::Ready { endpoint, settings } => {
Self::remote(settings, RemoteSourceRefState::Ready(endpoint))
}
RemoteRefEnvelope::Failed(error) => Self::remote(
StreamRefSettings::default(),
RemoteSourceRefState::Failed(error),
),
}
}
}
impl<T> SourceRef<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[must_use]
pub fn source(&self) -> Source<T, NotUsed> {
let inner = Arc::clone(&self.inner);
Source::from_materialized_factory(move |_materializer| {
if inner.subscribed.swap(true, Ordering::SeqCst) {
return Ok((
failed_once("source ref has already been materialized"),
NotUsed,
));
}
#[cfg(feature = "cluster")]
if let Some(remote) = &inner.remote {
return match remote {
RemoteSourceRefState::Ready(endpoint) => {
match materialize_remote_source_ref(
endpoint.clone(),
inner.settings,
Arc::clone(&inner),
) {
Ok(stream) => Ok((stream, NotUsed)),
Err(error) => Ok((failed_stream(error), NotUsed)),
}
}
RemoteSourceRefState::Failed(error) => {
Ok((failed_stream(error.clone()), NotUsed))
}
};
}
if let Some(timeout) = &inner.timeout {
timeout.cancel();
}
match inner.direct.claim_input() {
Ok(input) => Ok((
Box::new(SourceRefDirectStream::new(
input,
inner.settings,
Arc::clone(&inner),
)) as BoxStream<T>,
NotUsed,
)),
Err(error) => Ok((failed_stream(error), NotUsed)),
}
})
}
}
pub(super) fn proto_source<T>(source_ref: &SourceRef<T>) -> Source<T, NotUsed>
where
T: Send + 'static,
{
let inner = Arc::clone(&source_ref.inner);
Source::from_materialized_factory(move |_materializer| {
if inner.subscribed.swap(true, Ordering::SeqCst) {
return Ok((
failed_stream(StreamError::Failed(
"source ref has already been materialized".to_owned(),
)),
NotUsed,
));
}
if let Some(timeout) = &inner.timeout {
timeout.cancel();
}
match inner.direct.claim_input() {
Ok(input) => Ok((
Box::new(SourceRefDirectStream::new(
input,
inner.settings,
Arc::clone(&inner),
)) as BoxStream<T>,
NotUsed,
)),
Err(error) => Ok((failed_stream(error), NotUsed)),
}
})
}
#[cfg(feature = "cluster")]
impl<T> SourceRef<T>
where
T: BytesConvertable + Send + 'static,
{
fn remote(settings: StreamRefSettings, remote: RemoteSourceRefState) -> Self {
let direct = Arc::new(SourceRefDirectState::terminal(StreamError::ActorTerminated));
Self {
inner: Arc::new(SourceRefInner {
producer: LazyProducerStatus::new(Arc::clone(&direct)),
direct,
settings,
subscribed: AtomicBool::new(false),
timeout: None,
remote: Some(remote),
}),
}
}
fn activate_remote_producer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
if let Some(RemoteSourceRefState::Ready(endpoint)) = &self.inner.remote {
return Ok(endpoint.clone());
}
if let Some(RemoteSourceRefState::Failed(error)) = &self.inner.remote {
return Err(error.clone());
}
if self.inner.subscribed.swap(true, Ordering::SeqCst) {
return Err(StreamError::Failed(
"source ref has already been materialized".to_owned(),
));
}
if let Some(timeout) = &self.inner.timeout {
timeout.cancel();
}
let input = self.inner.direct.claim_input()?;
let shared = Arc::new(ProducerShared::new());
let endpoint = RemoteStreamRefEndpoint::new("source");
let (actor, _handle) = spawn_producer_actor(
None,
Arc::clone(&shared),
self.inner.settings,
Some(endpoint.clone()),
)?;
join_remote_endpoint(&endpoint, &actor);
let guard_actor = actor.clone();
let producer_endpoint = ProducerEndpointHandle::Actor {
actor,
shared: Arc::clone(&shared),
endpoint: Some(endpoint.clone()),
};
let cancelled = Arc::new(AtomicBool::new(false));
let settings = self.inner.settings;
let guard = ProducerEndpointDropGuard::new(guard_actor, None);
std::thread::spawn(move || {
let _guard = guard;
let _ = run_producer_endpoint(input, shared, producer_endpoint, settings, cancelled);
});
Ok(endpoint)
}
}
pub struct SinkRef<T> {
inner: Arc<SinkRefInner<T>>,
}
struct SinkRefInner<T> {
direct: Arc<SinkRefDirectState<T>>,
#[cfg(feature = "cluster")]
settings: StreamRefSettings,
subscribed: AtomicBool,
#[cfg(feature = "cluster")]
remote: Option<RemoteSinkRefState>,
}
impl<T> Clone for SinkRef<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> fmt::Debug for SinkRef<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SinkRef").finish_non_exhaustive()
}
}
#[cfg(feature = "cluster")]
impl<T> BytesConvertable for SinkRef<T>
where
T: BytesConvertable + Send + 'static,
{
fn into_bytes(self) -> Vec<u8> {
match self.activate_remote_consumer() {
Ok(endpoint) => RemoteRefEnvelope::Ready {
endpoint,
settings: self.inner.settings,
},
Err(error) => RemoteRefEnvelope::Failed(error),
}
.into_bytes()
}
fn from_bytes(bytes: Vec<u8>) -> Self {
match RemoteRefEnvelope::from_bytes(bytes) {
RemoteRefEnvelope::Ready { endpoint, settings } => {
Self::remote(settings, RemoteSinkRefState::Ready(endpoint))
}
RemoteRefEnvelope::Failed(error) => Self::remote(
StreamRefSettings::default(),
RemoteSinkRefState::Failed(error),
),
}
}
}
impl<T> SinkRef<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[must_use]
pub fn sink(&self) -> Sink<T, StreamCompletion<NotUsed>> {
let inner = Arc::clone(&self.inner);
Sink::from_runner(move |input, _materializer| {
if inner.subscribed.swap(true, Ordering::SeqCst) {
return Ok(StreamCompletion::ready(Err(StreamError::Failed(
"sink ref has already been materialized".to_owned(),
))));
}
#[cfg(feature = "cluster")]
if let Some(remote) = &inner.remote {
return match remote {
RemoteSinkRefState::Ready(endpoint) => {
materialize_remote_sink_ref(endpoint.clone(), inner.settings, input)
}
RemoteSinkRefState::Failed(error) => {
Ok(StreamCompletion::ready(Err(error.clone())))
}
};
}
Ok(inner.direct.attach_input(input))
})
}
}
#[cfg(feature = "cluster")]
impl<T> SinkRef<T>
where
T: BytesConvertable + Send + 'static,
{
fn remote(settings: StreamRefSettings, remote: RemoteSinkRefState) -> Self {
Self {
inner: Arc::new(SinkRefInner {
direct: Arc::new(SinkRefDirectState::new()),
#[cfg(feature = "cluster")]
settings,
subscribed: AtomicBool::new(false),
remote: Some(remote),
}),
}
}
fn activate_remote_consumer(&self) -> StreamResult<RemoteStreamRefEndpoint> {
if let Some(RemoteSinkRefState::Ready(endpoint)) = &self.inner.remote {
return Ok(endpoint.clone());
}
if let Some(RemoteSinkRefState::Failed(error)) = &self.inner.remote {
return Err(error.clone());
}
if self.inner.subscribed.swap(true, Ordering::SeqCst) {
return Err(StreamError::Failed(
"sink ref has already been materialized".to_owned(),
));
}
let shared = Arc::new(ConsumerShared::new(self.inner.settings));
let endpoint = RemoteStreamRefEndpoint::new("sink");
let (actor, _handle) = spawn_consumer_actor(
None,
Arc::clone(&shared),
self.inner.settings,
Some(endpoint.clone()),
)?;
join_remote_endpoint(&endpoint, &actor);
let stream = ConsumerStream {
shared,
actor_ref: Some(actor),
endpoint: Some(endpoint.clone()),
settings: self.inner.settings,
terminated: false,
source_ref_keep_alive: None,
};
self.inner.direct.attach_input_unmanaged(Box::new(stream))?;
Ok(endpoint)
}
}
#[allow(dead_code)]
enum ProducerCommand<T> {
Subscribe {
consumer: ActorRef<ConsumerCommand<T>>,
},
#[cfg(feature = "cluster")]
SubscribeRemote {
consumer: RemoteStreamRefEndpoint,
},
Demand {
consumer: ActorRef<ConsumerCommand<T>>,
cumulative: u64,
},
#[cfg(feature = "cluster")]
DemandRemote {
consumer: RemoteStreamRefEndpoint,
cumulative: u64,
},
Cancel {
consumer: ActorRef<ConsumerCommand<T>>,
},
#[cfg(feature = "cluster")]
CancelRemote {
consumer: RemoteStreamRefEndpoint,
},
RemoteFailure {
consumer: ActorRef<ConsumerCommand<T>>,
error: StreamError,
},
#[cfg(feature = "cluster")]
RemoteFailureRemote {
consumer: RemoteStreamRefEndpoint,
error: StreamError,
},
Ack,
}
#[cfg(feature = "cluster")]
impl<T> Message for ProducerCommand<T>
where
T: BytesConvertable + Send + 'static,
{
fn serializable() -> bool {
true
}
fn serialize(
self,
) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
match self {
Self::SubscribeRemote { consumer } => {
Ok(remote_cast("SubscribeRemote", vec![consumer.into_bytes()]))
}
Self::DemandRemote {
consumer,
cumulative,
} => Ok(remote_cast(
"DemandRemote",
vec![consumer.into_bytes(), cumulative.into_bytes()],
)),
Self::CancelRemote { consumer } => {
Ok(remote_cast("CancelRemote", vec![consumer.into_bytes()]))
}
Self::RemoteFailureRemote { consumer, error } => Ok(remote_cast(
"RemoteFailureRemote",
vec![consumer.into_bytes(), error.into_bytes()],
)),
Self::Ack => Ok(remote_cast("Ack", Vec::new())),
Self::Subscribe { .. }
| Self::Demand { .. }
| Self::Cancel { .. }
| Self::RemoteFailure { .. } => Err(ractor::message::BoxedDowncastErr),
}
}
fn deserialize(
message: ractor::message::SerializedMessage,
) -> Result<Self, ractor::message::BoxedDowncastErr> {
let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
return Err(ractor::message::BoxedDowncastErr);
};
let mut cursor = 0;
match variant.as_str() {
"SubscribeRemote" => Ok(Self::SubscribeRemote {
consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
}),
"DemandRemote" => Ok(Self::DemandRemote {
consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
cumulative: u64::from_bytes(take_bytes(&args, &mut cursor)),
}),
"CancelRemote" => Ok(Self::CancelRemote {
consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
}),
"RemoteFailureRemote" => Ok(Self::RemoteFailureRemote {
consumer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
}),
"Ack" => Ok(Self::Ack),
_ => Err(ractor::message::BoxedDowncastErr),
}
}
}
#[allow(dead_code)]
enum ConsumerCommand<T> {
OnSubscribe {
producer: ActorRef<ProducerCommand<T>>,
},
#[cfg(feature = "cluster")]
OnSubscribeRemote { producer: RemoteStreamRefEndpoint },
Element {
producer: ActorRef<ProducerCommand<T>>,
seq: u64,
item: T,
},
#[cfg(feature = "cluster")]
ElementRemote {
producer: RemoteStreamRefEndpoint,
seq: u64,
item: T,
},
Complete {
producer: ActorRef<ProducerCommand<T>>,
seq: u64,
},
#[cfg(feature = "cluster")]
CompleteRemote {
producer: RemoteStreamRefEndpoint,
seq: u64,
},
Failure {
producer: ActorRef<ProducerCommand<T>>,
error: StreamError,
},
#[cfg(feature = "cluster")]
FailureRemote {
producer: RemoteStreamRefEndpoint,
error: StreamError,
},
}
#[cfg(feature = "cluster")]
impl<T> Message for ConsumerCommand<T>
where
T: BytesConvertable + Send + 'static,
{
fn serializable() -> bool {
true
}
fn serialize(
self,
) -> Result<ractor::message::SerializedMessage, ractor::message::BoxedDowncastErr> {
match self {
Self::OnSubscribeRemote { producer } => Ok(remote_cast(
"OnSubscribeRemote",
vec![producer.into_bytes()],
)),
Self::ElementRemote {
producer,
seq,
item,
} => Ok(remote_cast(
"ElementRemote",
vec![producer.into_bytes(), seq.into_bytes(), item.into_bytes()],
)),
Self::CompleteRemote { producer, seq } => Ok(remote_cast(
"CompleteRemote",
vec![producer.into_bytes(), seq.into_bytes()],
)),
Self::FailureRemote { producer, error } => Ok(remote_cast(
"FailureRemote",
vec![producer.into_bytes(), error.into_bytes()],
)),
Self::OnSubscribe { .. }
| Self::Element { .. }
| Self::Complete { .. }
| Self::Failure { .. } => Err(ractor::message::BoxedDowncastErr),
}
}
fn deserialize(
message: ractor::message::SerializedMessage,
) -> Result<Self, ractor::message::BoxedDowncastErr> {
let ractor::message::SerializedMessage::Cast { variant, args, .. } = message else {
return Err(ractor::message::BoxedDowncastErr);
};
let mut cursor = 0;
match variant.as_str() {
"OnSubscribeRemote" => Ok(Self::OnSubscribeRemote {
producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
}),
"ElementRemote" => Ok(Self::ElementRemote {
producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
item: T::from_bytes(take_bytes(&args, &mut cursor)),
}),
"CompleteRemote" => Ok(Self::CompleteRemote {
producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
seq: u64::from_bytes(take_bytes(&args, &mut cursor)),
}),
"FailureRemote" => Ok(Self::FailureRemote {
producer: RemoteStreamRefEndpoint::from_bytes(take_bytes(&args, &mut cursor)),
error: StreamError::from_bytes(take_bytes(&args, &mut cursor)),
}),
_ => Err(ractor::message::BoxedDowncastErr),
}
}
}
#[allow(dead_code)]
enum ConsumerEndpoint<T> {
Actor(ActorRef<ConsumerCommand<T>>),
Direct(Weak<ConsumerShared<T>>),
}
impl<T> Clone for ConsumerEndpoint<T> {
fn clone(&self) -> Self {
match self {
Self::Actor(actor) => Self::Actor(actor.clone()),
Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
}
}
}
#[allow(dead_code)]
enum ProducerEndpoint<T> {
Actor(ActorRef<ProducerCommand<T>>),
Direct(Weak<ProducerShared<T>>),
}
impl<T> Clone for ProducerEndpoint<T> {
fn clone(&self) -> Self {
match self {
Self::Actor(actor) => Self::Actor(actor.clone()),
Self::Direct(shared) => Self::Direct(Weak::clone(shared)),
}
}
}
#[allow(dead_code)]
enum ProducerEndpointHandle<T> {
Actor {
actor: ActorRef<ProducerCommand<T>>,
shared: Arc<ProducerShared<T>>,
#[cfg(feature = "cluster")]
endpoint: Option<RemoteStreamRefEndpoint>,
},
Direct(Arc<ProducerShared<T>>),
}
impl<T> ProducerEndpointHandle<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
fn stop_actor(&self) {
if let Self::Actor { actor, .. } = self {
actor.stop(None);
}
}
fn actor_ref(&self) -> Option<ActorRef<ProducerCommand<T>>> {
match self {
Self::Actor { actor, .. } => Some(actor.clone()),
Self::Direct(_) => None,
}
}
#[cfg(feature = "cluster")]
fn endpoint(&self) -> Option<RemoteStreamRefEndpoint> {
match self {
Self::Actor { endpoint, .. } => endpoint.clone(),
Self::Direct(_) => None,
}
}
fn direct_shared(&self) -> Option<Arc<ProducerShared<T>>> {
match self {
Self::Actor { shared, .. } | Self::Direct(shared) => Some(Arc::clone(shared)),
}
}
}
#[cfg(feature = "cluster")]
struct ProducerEndpointDropGuard<T: stream_ref_actor_bound::Bound> {
actor: Option<ActorRef<ProducerCommand<T>>>,
cancelled: Option<Arc<AtomicBool>>,
}
#[cfg(feature = "cluster")]
impl<T: stream_ref_actor_bound::Bound> ProducerEndpointDropGuard<T> {
fn new(actor: ActorRef<ProducerCommand<T>>, cancelled: Option<Arc<AtomicBool>>) -> Self {
Self {
actor: Some(actor),
cancelled,
}
}
}
#[cfg(feature = "cluster")]
impl<T: stream_ref_actor_bound::Bound> Drop for ProducerEndpointDropGuard<T> {
fn drop(&mut self) {
if let Some(actor) = self.actor.take() {
actor.stop(None);
}
if let Some(cancelled) = self.cancelled.as_ref() {
cancelled.store(true, Ordering::SeqCst);
}
}
}
#[allow(dead_code)]
struct LazyProducerStatus<T> {
direct: Arc<SourceRefDirectState<T>>,
}
#[allow(dead_code)]
impl<T> LazyProducerStatus<T>
where
T: Send + 'static,
{
fn new(direct: Arc<SourceRefDirectState<T>>) -> Self {
Self { direct }
}
fn get_status(&self) -> ractor::ActorStatus {
self.direct.status()
}
}
struct SourceRefDirectState<T> {
inner: Mutex<SourceRefDirectInner<T>>,
}
struct SourceRefDirectInner<T> {
input: Option<BoxStream<T>>,
terminal: Option<StreamError>,
subscribed: bool,
}
impl<T> SourceRefDirectState<T>
where
T: Send + 'static,
{
fn new(input: BoxStream<T>) -> Self {
Self {
inner: Mutex::new(SourceRefDirectInner {
input: Some(input),
terminal: None,
subscribed: false,
}),
}
}
#[cfg(feature = "cluster")]
fn terminal(error: StreamError) -> Self {
Self {
inner: Mutex::new(SourceRefDirectInner {
input: None,
terminal: Some(error),
subscribed: true,
}),
}
}
fn lock(&self) -> MutexGuard<'_, SourceRefDirectInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn claim_input(&self) -> StreamResult<BoxStream<T>> {
let mut inner = self.lock();
inner.subscribed = true;
if let Some(error) = inner.terminal.clone() {
return Err(error);
}
inner.input.take().ok_or(StreamError::ActorTerminated)
}
fn timeout_if_unsubscribed(&self) {
let mut inner = self.lock();
if !inner.subscribed && inner.terminal.is_none() {
let input = inner.input.take();
inner.terminal = Some(subscription_timeout_error("stream ref sink"));
drop(inner);
drop(input);
}
}
#[allow(dead_code)]
fn status(&self) -> ractor::ActorStatus {
let inner = self.lock();
if inner.terminal.is_some() || inner.input.is_none() {
ractor::ActorStatus::Stopped
} else {
ractor::ActorStatus::Running
}
}
}
struct SourceRefDirectStream<T>
where
T: Send + 'static,
{
input: Option<BoxStream<T>>,
queue: VecDeque<T>,
terminal: Option<ConsumerTerminal>,
settings: StreamRefSettings,
terminated: bool,
_keep_alive: Arc<SourceRefInner<T>>,
}
impl<T> SourceRefDirectStream<T>
where
T: Send + 'static,
{
fn new(
input: BoxStream<T>,
settings: StreamRefSettings,
keep_alive: Arc<SourceRefInner<T>>,
) -> Self {
Self {
input: Some(input),
queue: VecDeque::new(),
terminal: None,
settings,
terminated: false,
_keep_alive: keep_alive,
}
}
fn fill_prefetch(&mut self) {
while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
let Some(input) = self.input.as_mut() else {
self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
break;
};
match input.next() {
Some(Ok(item)) => self.queue.push_back(item),
Some(Err(error)) => {
self.input.take();
self.terminal = Some(ConsumerTerminal::Error(error));
}
None => {
self.input.take();
self.terminal = Some(ConsumerTerminal::Complete);
}
}
}
}
}
impl<T> Iterator for SourceRefDirectStream<T>
where
T: Send + 'static,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
self.fill_prefetch();
if let Some(item) = self.queue.pop_front() {
self.fill_prefetch();
return Some(Ok(item));
}
match self.terminal.clone() {
Some(ConsumerTerminal::Complete) => {
self.terminated = true;
None
}
Some(ConsumerTerminal::Error(error)) => {
self.terminated = true;
Some(Err(error))
}
None => None,
}
}
}
impl<T> Drop for SourceRefDirectStream<T>
where
T: Send + 'static,
{
fn drop(&mut self) {
self.input.take();
self.queue.clear();
}
}
struct SinkRefDirectState<T> {
inner: Mutex<SinkRefDirectInner<T>>,
changed: Condvar,
}
struct SinkRefDirectInner<T> {
input: Option<BoxStream<T>>,
completion: Option<oneshot::Sender<StreamResult<NotUsed>>>,
completion_cancelled: Option<Arc<AtomicBool>>,
terminal: Option<StreamError>,
subscribed: bool,
}
impl<T> SinkRefDirectState<T>
where
T: Send + 'static,
{
fn new() -> Self {
Self {
inner: Mutex::new(SinkRefDirectInner {
input: None,
completion: None,
completion_cancelled: None,
terminal: None,
subscribed: false,
}),
changed: Condvar::new(),
}
}
fn lock(&self) -> MutexGuard<'_, SinkRefDirectInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn attach_input(&self, input: BoxStream<T>) -> StreamCompletion<NotUsed> {
let mut inner = self.lock();
if let Some(error) = inner.terminal.clone() {
return StreamCompletion::ready(Err(error));
}
if inner.subscribed {
return StreamCompletion::ready(Err(StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
)));
}
let (sender, receiver) = oneshot::channel();
let cancellation = StreamCancellation::for_external_completion();
inner.input = Some(input);
inner.completion = Some(sender);
inner.completion_cancelled = Some(cancellation.cancelled());
inner.subscribed = true;
drop(inner);
self.changed.notify_all();
StreamCompletion::from_receiver(receiver, Some(cancellation))
}
#[cfg(feature = "cluster")]
fn attach_input_unmanaged(&self, input: BoxStream<T>) -> StreamResult<()> {
let mut inner = self.lock();
if let Some(error) = inner.terminal.clone() {
return Err(error);
}
if inner.subscribed {
return Err(StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
));
}
inner.input = Some(input);
inner.completion = None;
inner.completion_cancelled = None;
inner.subscribed = true;
drop(inner);
self.changed.notify_all();
Ok(())
}
fn wait_for_input(&self, settings: StreamRefSettings) -> StreamResult<BoxStream<T>> {
let deadline = Instant::now()
.checked_add(settings.subscription_timeout)
.unwrap_or_else(far_future);
let mut inner = self.lock();
loop {
if let Some(cancelled) = &inner.completion_cancelled
&& cancelled.load(Ordering::SeqCst)
{
let input = inner.input.take();
inner.terminal = Some(StreamError::Cancelled);
inner.completion.take();
inner.completion_cancelled = None;
drop(inner);
drop(input);
self.changed.notify_all();
return Err(StreamError::Cancelled);
}
if let Some(input) = inner.input.take() {
return Ok(input);
}
if let Some(error) = inner.terminal.clone() {
return Err(error);
}
let now = Instant::now();
if now >= deadline {
let error = subscription_timeout_error("stream ref source");
inner.terminal = Some(error.clone());
if let Some(sender) = inner.completion.take() {
let _ = sender.send(Err(error.clone()));
}
return Err(error);
}
let remaining = deadline.saturating_duration_since(now);
let (next, _) =
wait_timeout_unpoison(&self.changed, inner, remaining.min(STREAM_REF_WAIT_POLL));
inner = next;
}
}
fn settle(&self, result: StreamResult<NotUsed>) {
let mut inner = self.lock();
if let Some(sender) = inner.completion.take() {
let _ = sender.send(result);
}
inner.completion_cancelled = None;
drop(inner);
self.changed.notify_all();
}
fn fail_unattached(&self, error: StreamError) {
let mut inner = self.lock();
if inner.terminal.is_none() {
let input = inner.input.take();
inner.terminal = Some(error.clone());
if let Some(sender) = inner.completion.take() {
let _ = sender.send(Err(error));
}
inner.completion_cancelled = None;
drop(inner);
drop(input);
self.changed.notify_all();
}
}
fn is_completion_cancelled(&self) -> bool {
self.lock()
.completion_cancelled
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
}
}
struct SinkRefDirectStream<T>
where
T: Send + 'static,
{
state: Arc<SinkRefDirectState<T>>,
input: Option<BoxStream<T>>,
queue: VecDeque<T>,
terminal: Option<ConsumerTerminal>,
settings: StreamRefSettings,
terminated: bool,
}
impl<T> SinkRefDirectStream<T>
where
T: Send + 'static,
{
fn new(state: Arc<SinkRefDirectState<T>>, settings: StreamRefSettings) -> Self {
Self {
state,
input: None,
queue: VecDeque::new(),
terminal: None,
settings,
terminated: false,
}
}
fn ensure_input(&mut self) -> StreamResult<()> {
if self.input.is_none() && self.terminal.is_none() {
self.input = Some(self.state.wait_for_input(self.settings)?);
}
Ok(())
}
fn fill_prefetch(&mut self) -> StreamResult<()> {
self.ensure_input()?;
while self.terminal.is_none() && self.queue.len() < self.settings.buffer_capacity {
if self.state.is_completion_cancelled() {
self.input.take();
self.state.settle(Err(StreamError::Cancelled));
self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
break;
}
let Some(input) = self.input.as_mut() else {
self.terminal = Some(ConsumerTerminal::Error(StreamError::Cancelled));
break;
};
match input.next() {
Some(Ok(item)) => self.queue.push_back(item),
Some(Err(error)) => {
self.input.take();
self.state.settle(Err(error.clone()));
self.terminal = Some(ConsumerTerminal::Error(error));
}
None => {
self.input.take();
self.state.settle(Ok(NotUsed));
self.terminal = Some(ConsumerTerminal::Complete);
}
}
}
Ok(())
}
}
impl<T> Iterator for SinkRefDirectStream<T>
where
T: Send + 'static,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.fill_prefetch() {
self.terminated = true;
return Some(Err(error));
}
if let Some(item) = self.queue.pop_front() {
if let Err(error) = self.fill_prefetch() {
self.terminal = Some(ConsumerTerminal::Error(error));
}
return Some(Ok(item));
}
match self.terminal.clone() {
Some(ConsumerTerminal::Complete) => {
self.terminated = true;
None
}
Some(ConsumerTerminal::Error(error)) => {
self.terminated = true;
Some(Err(error))
}
None => None,
}
}
}
impl<T> Drop for SinkRefDirectStream<T>
where
T: Send + 'static,
{
fn drop(&mut self) {
if !self.terminated {
self.input.take();
self.queue.clear();
self.state.fail_unattached(StreamError::Cancelled);
self.state.settle(Err(StreamError::Cancelled));
}
}
}
#[allow(dead_code)]
struct ProducerShared<T> {
inner: Mutex<ProducerInner<T>>,
changed: Condvar,
}
#[allow(dead_code)]
struct ProducerInner<T> {
consumer: Option<ConsumerEndpoint<T>>,
cumulative_demand: u64,
sent: u64,
stopped: Option<StreamError>,
}
#[allow(dead_code)]
impl<T> ProducerShared<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
fn new() -> Self {
Self {
inner: Mutex::new(ProducerInner {
consumer: None,
cumulative_demand: 0,
sent: 0,
stopped: None,
}),
changed: Condvar::new(),
}
}
fn lock(&self) -> MutexGuard<'_, ProducerInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn set_consumer(
&self,
consumer: ActorRef<ConsumerCommand<T>>,
) -> Result<bool, ActorRef<ConsumerCommand<T>>> {
let mut inner = self.lock();
match &inner.consumer {
Some(ConsumerEndpoint::Actor(existing)) if same_actor(existing, &consumer) => Ok(false),
Some(_) => Err(consumer),
None if inner.stopped.is_some() => Err(consumer),
None => {
inner.consumer = Some(ConsumerEndpoint::Actor(consumer));
drop(inner);
self.changed.notify_all();
Ok(true)
}
}
}
fn set_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>) -> StreamResult<bool> {
let mut inner = self.lock();
match &inner.consumer {
Some(ConsumerEndpoint::Direct(existing))
if existing.ptr_eq(&Arc::downgrade(consumer)) =>
{
Ok(false)
}
Some(_) => Err(StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
)),
None => {
if let Some(error) = inner.stopped.clone() {
return Err(error);
}
inner.consumer = Some(ConsumerEndpoint::Direct(Arc::downgrade(consumer)));
drop(inner);
self.changed.notify_all();
Ok(true)
}
}
}
fn update_demand(&self, consumer: &ActorRef<ConsumerCommand<T>>, cumulative: u64) {
self.update_demand_if(cumulative, |existing| {
matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
});
}
fn update_direct_demand(&self, consumer: &Arc<ConsumerShared<T>>, cumulative: u64) {
let consumer = Arc::downgrade(consumer);
self.update_demand_if(cumulative, |existing| {
matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
});
}
fn update_demand_if(
&self,
cumulative: u64,
matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
) {
let mut inner = self.lock();
if inner.consumer.as_ref().is_some_and(matches_consumer)
&& cumulative > inner.cumulative_demand
{
inner.cumulative_demand = cumulative;
drop(inner);
self.changed.notify_all();
}
}
fn stop_from_consumer(&self, consumer: &ActorRef<ConsumerCommand<T>>, error: StreamError) {
self.stop_from_consumer_if(error, |existing| {
matches!(existing, ConsumerEndpoint::Actor(actor) if same_actor(actor, consumer))
});
}
fn stop_from_direct_consumer(&self, consumer: &Arc<ConsumerShared<T>>, error: StreamError) {
let consumer = Arc::downgrade(consumer);
self.stop_from_consumer_if(error, |existing| {
matches!(existing, ConsumerEndpoint::Direct(shared) if shared.ptr_eq(&consumer))
});
}
fn stop_from_consumer_if(
&self,
error: StreamError,
matches_consumer: impl FnOnce(&ConsumerEndpoint<T>) -> bool,
) {
let mut inner = self.lock();
if inner.consumer.as_ref().is_none_or(matches_consumer) && inner.stopped.is_none() {
inner.stopped = Some(error);
drop(inner);
self.changed.notify_all();
}
}
fn stop_unless_finished(&self, error: StreamError) {
let mut inner = self.lock();
if inner.stopped.is_none() {
inner.stopped = Some(error);
drop(inner);
self.changed.notify_all();
}
}
}
#[allow(dead_code)]
struct ConsumerShared<T> {
inner: Mutex<ConsumerInner<T>>,
changed: Condvar,
}
#[allow(dead_code)]
struct ConsumerInner<T> {
producer: Option<ProducerEndpoint<T>>,
queue: VecDeque<T>,
terminal: Option<ConsumerTerminal>,
expected_seq: u64,
delivered: u64,
cumulative_demand: u64,
}
#[derive(Clone)]
enum ConsumerTerminal {
Complete,
Error(StreamError),
}
#[allow(dead_code)]
impl<T> ConsumerShared<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
fn new(_settings: StreamRefSettings) -> Self {
Self {
inner: Mutex::new(ConsumerInner {
producer: None,
queue: VecDeque::new(),
terminal: None,
expected_seq: 0,
delivered: 0,
cumulative_demand: 0,
}),
changed: Condvar::new(),
}
}
fn lock(&self) -> MutexGuard<'_, ConsumerInner<T>> {
self.inner
.lock()
.unwrap_or_else(|poison| poison.into_inner())
}
fn set_producer(&self, producer: ActorRef<ProducerCommand<T>>) -> bool {
let mut inner = self.lock();
match &inner.producer {
Some(ProducerEndpoint::Actor(existing)) => same_actor(existing, &producer),
Some(ProducerEndpoint::Direct(_)) => false,
None => {
inner.producer = Some(ProducerEndpoint::Actor(producer));
drop(inner);
self.changed.notify_all();
true
}
}
}
fn set_direct_producer(&self, producer: &Arc<ProducerShared<T>>) -> StreamResult<bool> {
let mut inner = self.lock();
match &inner.producer {
Some(ProducerEndpoint::Direct(existing))
if existing.ptr_eq(&Arc::downgrade(producer)) =>
{
Ok(false)
}
Some(_) => Err(StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
)),
None => {
if let Some(terminal) = inner.terminal.clone() {
return match terminal {
ConsumerTerminal::Complete => Err(StreamError::ActorTerminated),
ConsumerTerminal::Error(error) => Err(error),
};
}
inner.producer = Some(ProducerEndpoint::Direct(Arc::downgrade(producer)));
drop(inner);
self.changed.notify_all();
Ok(true)
}
}
}
fn push(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64, item: T) {
self.push_if(seq, item, |inner| producer_matches_actor(inner, producer));
}
fn push_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64, item: T) {
let producer = Arc::downgrade(producer);
self.push_if(seq, item, |inner| producer_matches_direct(inner, &producer));
}
fn push_if(&self, seq: u64, item: T, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
let mut inner = self.lock();
if inner.terminal.is_some() || !matches_producer(&inner) {
return;
}
let should_notify;
if seq != inner.expected_seq {
inner.queue.clear();
inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
inner.expected_seq,
seq,
"stream ref element sequence gap",
)));
should_notify = true;
} else {
should_notify = inner.queue.is_empty();
inner.expected_seq += 1;
inner.queue.push_back(item);
}
drop(inner);
if should_notify {
self.changed.notify_all();
}
}
fn complete(&self, producer: &ActorRef<ProducerCommand<T>>, seq: u64) {
self.complete_if(seq, |inner| producer_matches_actor(inner, producer));
}
fn complete_direct(&self, producer: &Arc<ProducerShared<T>>, seq: u64) {
let producer = Arc::downgrade(producer);
self.complete_if(seq, |inner| producer_matches_direct(inner, &producer));
}
fn complete_if(&self, seq: u64, matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool) {
let mut inner = self.lock();
if inner.terminal.is_some() || !matches_producer(&inner) {
return;
}
if seq != inner.expected_seq {
inner.queue.clear();
inner.terminal = Some(ConsumerTerminal::Error(invalid_sequence_error(
inner.expected_seq,
seq,
"stream ref completion sequence gap",
)));
} else {
inner.terminal = Some(ConsumerTerminal::Complete);
}
drop(inner);
self.changed.notify_all();
}
fn fail(&self, producer: &ActorRef<ProducerCommand<T>>, error: StreamError) {
self.fail_if(error, |inner| producer_matches_actor(inner, producer));
}
fn fail_direct(&self, producer: &Arc<ProducerShared<T>>, error: StreamError) {
let producer = Arc::downgrade(producer);
self.fail_if(error, |inner| producer_matches_direct(inner, &producer));
}
fn fail_if(
&self,
error: StreamError,
matches_producer: impl FnOnce(&ConsumerInner<T>) -> bool,
) {
let mut inner = self.lock();
if inner.terminal.is_some() || !matches_producer(&inner) {
return;
}
inner.queue.clear();
inner.terminal = Some(ConsumerTerminal::Error(error));
drop(inner);
self.changed.notify_all();
}
fn fail_local(&self, error: StreamError) {
let mut inner = self.lock();
if inner.terminal.is_none() {
inner.queue.clear();
inner.terminal = Some(ConsumerTerminal::Error(error));
drop(inner);
self.changed.notify_all();
}
}
}
#[allow(dead_code)]
fn producer_matches_actor<T>(
inner: &ConsumerInner<T>,
producer: &ActorRef<ProducerCommand<T>>,
) -> bool
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
matches!(
inner.producer.as_ref(),
Some(ProducerEndpoint::Actor(existing)) if same_actor(existing, producer)
)
}
#[allow(dead_code)]
fn producer_matches_direct<T>(inner: &ConsumerInner<T>, producer: &Weak<ProducerShared<T>>) -> bool
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
matches!(
inner.producer.as_ref(),
Some(ProducerEndpoint::Direct(existing)) if existing.ptr_eq(producer)
)
}
#[allow(dead_code)]
struct ProducerActor<T> {
shared: Arc<ProducerShared<T>>,
initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
settings: StreamRefSettings,
#[cfg(feature = "cluster")]
endpoint: Option<RemoteStreamRefEndpoint>,
}
impl<T> Actor for ProducerActor<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
type Msg = ProducerCommand<T>;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
if let Some(consumer) = &self.initial_consumer {
register_producer_consumer(
&self.shared,
myself,
consumer.clone(),
#[cfg(feature = "cluster")]
self.endpoint.as_ref(),
);
}
Ok(())
}
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
match message {
ProducerCommand::Subscribe { consumer } => {
register_producer_consumer(
&self.shared,
myself,
consumer,
#[cfg(feature = "cluster")]
self.endpoint.as_ref(),
);
}
#[cfg(feature = "cluster")]
ProducerCommand::SubscribeRemote { consumer } => {
match resolve_remote_actor(&consumer, self.settings.subscription_timeout) {
Ok(consumer) => register_producer_consumer(
&self.shared,
myself,
consumer,
self.endpoint.as_ref(),
),
Err(error) => self.shared.stop_unless_finished(error),
}
}
ProducerCommand::Demand {
consumer,
cumulative,
} => self.shared.update_demand(&consumer, cumulative),
#[cfg(feature = "cluster")]
ProducerCommand::DemandRemote {
consumer,
cumulative,
} => {
if let Ok(consumer) =
resolve_remote_actor(&consumer, self.settings.subscription_timeout)
{
self.shared.update_demand(&consumer, cumulative);
}
}
ProducerCommand::Cancel { consumer } => {
self.shared
.stop_from_consumer(&consumer, StreamError::Cancelled);
}
#[cfg(feature = "cluster")]
ProducerCommand::CancelRemote { consumer } => {
if let Ok(consumer) =
resolve_remote_actor(&consumer, self.settings.subscription_timeout)
{
self.shared
.stop_from_consumer(&consumer, StreamError::Cancelled);
}
}
ProducerCommand::RemoteFailure { consumer, error } => {
self.shared.stop_from_consumer(&consumer, error);
}
#[cfg(feature = "cluster")]
ProducerCommand::RemoteFailureRemote { consumer, error } => {
if let Ok(consumer) =
resolve_remote_actor(&consumer, self.settings.subscription_timeout)
{
self.shared.stop_from_consumer(&consumer, error);
}
}
ProducerCommand::Ack => myself.stop(None),
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
_state: &mut Self::State,
) -> ActorResult {
self.shared
.stop_unless_finished(StreamError::ActorTerminated);
Ok(())
}
}
#[allow(dead_code)]
fn register_producer_consumer<T>(
shared: &Arc<ProducerShared<T>>,
producer: ActorRef<ProducerCommand<T>>,
consumer: ActorRef<ConsumerCommand<T>>,
#[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
) where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
match shared.set_consumer(consumer.clone()) {
Ok(true) | Ok(false) => {
let _ = cast_consumer_on_subscribe(
&consumer,
producer.clone(),
#[cfg(feature = "cluster")]
producer_endpoint,
);
}
Err(duplicate) => {
let _ = cast_consumer_failure(
&duplicate,
producer,
#[cfg(feature = "cluster")]
producer_endpoint,
StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
),
);
}
}
}
#[allow(dead_code)]
fn register_direct_producer_consumer<T>(
producer: &Arc<ProducerShared<T>>,
consumer: &Arc<ConsumerShared<T>>,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
consumer.set_direct_producer(producer)?;
if let Err(error) = producer.set_direct_consumer(consumer) {
consumer.fail_local(error.clone());
return Err(error);
}
Ok(())
}
#[allow(dead_code)]
struct ConsumerActor<T> {
shared: Arc<ConsumerShared<T>>,
initial_producer: Option<ActorRef<ProducerCommand<T>>>,
settings: StreamRefSettings,
#[cfg(feature = "cluster")]
endpoint: Option<RemoteStreamRefEndpoint>,
}
impl<T> Actor for ConsumerActor<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
type Msg = ConsumerCommand<T>;
type State = ();
type Arguments = ();
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
if let Some(producer) = &self.initial_producer {
self.shared.set_producer(producer.clone());
if let Err(error) = cast_producer_subscribe(
producer,
myself.clone(),
#[cfg(feature = "cluster")]
self.endpoint.as_ref(),
) {
self.shared.fail_local(error);
myself.stop(None);
}
}
Ok(())
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> ActorResult {
match message {
ConsumerCommand::OnSubscribe { producer } => {
if !self.shared.set_producer(producer.clone()) {
let _ = cast_producer_remote_failure(
&producer,
_myself.clone(),
#[cfg(feature = "cluster")]
self.endpoint.as_ref(),
StreamError::Failed(
"stream ref was already subscribed by another endpoint".to_owned(),
),
);
}
}
#[cfg(feature = "cluster")]
ConsumerCommand::OnSubscribeRemote { producer } => {
match resolve_remote_actor(&producer, self.settings.subscription_timeout) {
Ok(producer) => {
if !self.shared.set_producer(producer.clone()) {
let _ = cast_producer_remote_failure(
&producer,
_myself.clone(),
self.endpoint.as_ref(),
StreamError::Failed(
"stream ref was already subscribed by another endpoint"
.to_owned(),
),
);
}
}
Err(error) => self.shared.fail_local(error),
}
}
ConsumerCommand::Element {
producer,
seq,
item,
} => self.shared.push(&producer, seq, item),
#[cfg(feature = "cluster")]
ConsumerCommand::ElementRemote {
producer,
seq,
item,
} => {
if let Ok(producer) =
resolve_remote_actor(&producer, self.settings.subscription_timeout)
{
self.shared.push(&producer, seq, item);
}
}
ConsumerCommand::Complete { producer, seq } => {
self.shared.complete(&producer, seq);
let _ = cast_actor(&producer, ProducerCommand::Ack);
}
#[cfg(feature = "cluster")]
ConsumerCommand::CompleteRemote { producer, seq } => {
if let Ok(producer) =
resolve_remote_actor(&producer, self.settings.subscription_timeout)
{
self.shared.complete(&producer, seq);
let _ = cast_actor(&producer, ProducerCommand::Ack);
}
}
ConsumerCommand::Failure { producer, error } => {
self.shared.fail(&producer, error);
let _ = cast_actor(&producer, ProducerCommand::Ack);
}
#[cfg(feature = "cluster")]
ConsumerCommand::FailureRemote { producer, error } => {
if let Ok(producer) =
resolve_remote_actor(&producer, self.settings.subscription_timeout)
{
self.shared.fail(&producer, error);
let _ = cast_actor(&producer, ProducerCommand::Ack);
}
}
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
_state: &mut Self::State,
) -> ActorResult {
self.shared.fail_local(StreamError::ActorTerminated);
Ok(())
}
}
#[allow(dead_code)]
struct ConsumerStream<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
shared: Arc<ConsumerShared<T>>,
actor_ref: Option<ActorRef<ConsumerCommand<T>>>,
#[cfg(feature = "cluster")]
endpoint: Option<RemoteStreamRefEndpoint>,
settings: StreamRefSettings,
terminated: bool,
source_ref_keep_alive: Option<Arc<SourceRefInner<T>>>,
}
impl<T> Iterator for ConsumerStream<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if self.terminated {
return None;
}
if let Err(error) = self.wait_for_subscription() {
self.terminated = true;
return Some(Err(error));
}
if let Err(error) = self.redeliver_or_extend_demand() {
self.terminated = true;
return Some(Err(error));
}
let mut next_redelivery = next_redelivery_deadline(self.settings);
loop {
let demand_after_pop = {
let mut inner = self.shared.lock();
if let Some(item) = inner.queue.pop_front() {
inner.delivered = inner.delivered.saturating_add(1);
let demand = next_demand(&mut inner, self.settings);
drop(inner);
if let Some((producer, cumulative)) = demand
&& let Err(error) = send_demand(
&self.shared,
&self.actor_ref,
#[cfg(feature = "cluster")]
&self.endpoint,
&producer,
cumulative,
)
{
self.terminated = true;
return Some(Err(error));
}
return Some(Ok(item));
}
if let Some(terminal) = inner.terminal.clone() {
drop(inner);
match terminal {
ConsumerTerminal::Complete => {
self.terminated = true;
self.stop_actor();
return None;
}
ConsumerTerminal::Error(error) => {
self.terminated = true;
self.stop_actor();
return Some(Err(error));
}
}
}
None::<(ProducerEndpoint<T>, u64)>
};
debug_assert!(demand_after_pop.is_none());
let now = Instant::now();
let timeout = next_redelivery.saturating_duration_since(now);
let mut inner = self.shared.lock();
if !inner.queue.is_empty() || inner.terminal.is_some() {
continue;
}
let (next_inner, result) = wait_timeout_unpoison(
&self.shared.changed,
inner,
timeout.min(STREAM_REF_WAIT_POLL),
);
inner = next_inner;
drop(inner);
if result.timed_out() && Instant::now() >= next_redelivery {
if let Err(error) = self.redeliver_demand() {
self.terminated = true;
return Some(Err(error));
}
next_redelivery = next_redelivery_deadline(self.settings);
}
}
}
}
#[allow(dead_code)]
impl<T> ConsumerStream<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
fn wait_for_subscription(&self) -> StreamResult<()> {
let deadline = Instant::now()
.checked_add(self.settings.subscription_timeout)
.unwrap_or_else(far_future);
let mut inner = self.shared.lock();
loop {
if inner.producer.is_some() {
return Ok(());
}
if let Some(terminal) = inner.terminal.clone() {
return match terminal {
ConsumerTerminal::Complete => Ok(()),
ConsumerTerminal::Error(error) => Err(error),
};
}
let now = Instant::now();
if now >= deadline {
drop(inner);
let error = subscription_timeout_error("stream ref source");
self.shared.fail_local(error.clone());
self.stop_actor_ref();
return Err(error);
}
let remaining = deadline.saturating_duration_since(now);
let (next, _) = wait_timeout_unpoison(
&self.shared.changed,
inner,
remaining.min(STREAM_REF_WAIT_POLL),
);
inner = next;
}
}
fn redeliver_or_extend_demand(&self) -> StreamResult<()> {
let demand = {
let mut inner = self.shared.lock();
next_demand(&mut inner, self.settings)
};
if let Some((producer, cumulative)) = demand {
send_demand(
&self.shared,
&self.actor_ref,
#[cfg(feature = "cluster")]
&self.endpoint,
&producer,
cumulative,
)?;
}
Ok(())
}
fn redeliver_demand(&self) -> StreamResult<()> {
let (producer, cumulative) = {
let inner = self.shared.lock();
let Some(producer) = inner.producer.clone() else {
return Ok(());
};
if inner.cumulative_demand == 0 {
return Ok(());
}
(producer, inner.cumulative_demand)
};
send_demand(
&self.shared,
&self.actor_ref,
#[cfg(feature = "cluster")]
&self.endpoint,
&producer,
cumulative,
)
}
fn stop_actor(&mut self) {
if let Some(actor_ref) = self.actor_ref.take() {
actor_ref.stop(None);
}
}
fn stop_actor_ref(&self) {
if let Some(actor_ref) = &self.actor_ref {
actor_ref.stop(None);
}
}
}
impl<T> Drop for ConsumerStream<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
fn drop(&mut self) {
if !self.terminated {
let producer = self.shared.lock().producer.clone();
match producer {
Some(ProducerEndpoint::Actor(producer)) => {
if let Some(consumer) = &self.actor_ref {
let _ = cast_producer_cancel(
&producer,
consumer.clone(),
#[cfg(feature = "cluster")]
self.endpoint.as_ref(),
);
}
}
Some(ProducerEndpoint::Direct(producer)) => {
if let Some(producer) = producer.upgrade() {
producer.stop_from_direct_consumer(&self.shared, StreamError::Cancelled);
}
}
None => {}
}
}
self.stop_actor();
drop(self.source_ref_keep_alive.take());
}
}
#[allow(dead_code)]
fn next_demand<T>(
inner: &mut ConsumerInner<T>,
settings: StreamRefSettings,
) -> Option<(ProducerEndpoint<T>, u64)> {
if inner.terminal.is_some() {
return None;
}
let remaining_credit = inner.cumulative_demand.saturating_sub(inner.delivered);
if inner.cumulative_demand != 0 && remaining_credit > demand_replenish_threshold(settings) {
return None;
}
let target = inner
.delivered
.saturating_add(settings.buffer_capacity as u64);
if inner.cumulative_demand >= target {
return None;
}
inner.cumulative_demand = target;
inner
.producer
.as_ref()
.map(|producer| (producer.clone(), inner.cumulative_demand))
}
#[allow(dead_code)]
fn send_demand<T>(
consumer_shared: &Arc<ConsumerShared<T>>,
consumer: &Option<ActorRef<ConsumerCommand<T>>>,
#[cfg(feature = "cluster")] consumer_endpoint: &Option<RemoteStreamRefEndpoint>,
producer: &ProducerEndpoint<T>,
cumulative: u64,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
match producer {
ProducerEndpoint::Actor(producer) => {
let Some(consumer) = consumer else {
return Err(StreamError::ActorTerminated);
};
cast_producer_demand(
producer,
consumer.clone(),
#[cfg(feature = "cluster")]
consumer_endpoint.as_ref(),
cumulative,
)
}
ProducerEndpoint::Direct(producer) => {
let Some(producer) = producer.upgrade() else {
return Err(StreamError::ActorTerminated);
};
producer.update_direct_demand(consumer_shared, cumulative);
Ok(())
}
}
}
#[allow(dead_code)]
fn demand_replenish_threshold(settings: StreamRefSettings) -> u64 {
(settings.buffer_capacity as u64) / 2
}
#[cfg(feature = "cluster")]
fn materialize_remote_source_ref<T>(
producer_endpoint: RemoteStreamRefEndpoint,
settings: StreamRefSettings,
keep_alive: Arc<SourceRefInner<T>>,
) -> StreamResult<BoxStream<T>>
where
T: BytesConvertable + Send + 'static,
{
let producer = resolve_remote_actor(&producer_endpoint, settings.subscription_timeout)?;
let shared = Arc::new(ConsumerShared::new(settings));
let consumer_endpoint = RemoteStreamRefEndpoint::new("consumer");
let (consumer, _handle) = spawn_consumer_actor(
None,
Arc::clone(&shared),
settings,
Some(consumer_endpoint.clone()),
)?;
join_remote_endpoint(&consumer_endpoint, &consumer);
cast_producer_subscribe(&producer, consumer.clone(), Some(&consumer_endpoint))?;
Ok(Box::new(ConsumerStream {
shared,
actor_ref: Some(consumer),
endpoint: Some(consumer_endpoint),
settings,
terminated: false,
source_ref_keep_alive: Some(keep_alive),
}) as BoxStream<T>)
}
#[cfg(feature = "cluster")]
fn materialize_remote_sink_ref<T>(
consumer_endpoint: RemoteStreamRefEndpoint,
settings: StreamRefSettings,
input: BoxStream<T>,
) -> StreamResult<StreamCompletion<NotUsed>>
where
T: BytesConvertable + Send + 'static,
{
let consumer = resolve_remote_actor(&consumer_endpoint, settings.subscription_timeout)?;
let shared = Arc::new(ProducerShared::new());
let producer_endpoint = RemoteStreamRefEndpoint::new("producer");
let (producer, _handle) = spawn_producer_actor(
None,
Arc::clone(&shared),
settings,
Some(producer_endpoint.clone()),
)?;
join_remote_endpoint(&producer_endpoint, &producer);
register_producer_consumer(
&shared,
producer.clone(),
consumer,
Some(&producer_endpoint),
);
let cancelled = StreamCancellation::for_external_completion();
let cancel_flag = cancelled.cancelled();
let guard_cancel_flag = Arc::clone(&cancel_flag);
let (sender, receiver) = oneshot::channel();
let guard_producer = producer.clone();
let endpoint_handle = ProducerEndpointHandle::Actor {
actor: producer,
shared: Arc::clone(&shared),
endpoint: Some(producer_endpoint),
};
let guard = ProducerEndpointDropGuard::new(guard_producer, Some(guard_cancel_flag));
std::thread::spawn(move || {
let _guard = guard;
let result = run_producer_endpoint(input, shared, endpoint_handle, settings, cancel_flag);
let _ = sender.send(result);
});
Ok(StreamCompletion::from_receiver(receiver, Some(cancelled)))
}
fn stream_ref_source_sink<T>(settings: StreamRefSettings) -> Sink<T, SourceRef<T>>
where
T: Send + 'static,
{
Sink::from_runner(move |input, materializer| {
let direct = Arc::new(SourceRefDirectState::new(input));
let direct_for_timeout = Arc::clone(&direct);
let timeout = materializer.schedule_once(settings.subscription_timeout, move || {
direct_for_timeout.timeout_if_unsubscribed();
});
Ok(SourceRef {
inner: Arc::new(SourceRefInner {
producer: LazyProducerStatus::new(Arc::clone(&direct)),
direct,
settings,
subscribed: AtomicBool::new(false),
timeout: Some(timeout),
#[cfg(feature = "cluster")]
remote: None,
}),
})
})
}
fn stream_ref_sink_source<T>(settings: StreamRefSettings) -> Source<T, SinkRef<T>>
where
T: Send + 'static,
{
Source::from_materialized_factory(move |_materializer| {
let direct = Arc::new(SinkRefDirectState::new());
let sink_ref = SinkRef {
inner: Arc::new(SinkRefInner {
direct: Arc::clone(&direct),
#[cfg(feature = "cluster")]
settings,
subscribed: AtomicBool::new(false),
#[cfg(feature = "cluster")]
remote: None,
}),
};
Ok((
Box::new(SinkRefDirectStream::new(direct, settings)) as BoxStream<T>,
sink_ref,
))
})
}
#[allow(dead_code)]
fn send_element_to_consumer<T>(
consumer: &ConsumerEndpoint<T>,
producer: &ProducerEndpointHandle<T>,
seq: u64,
item: T,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
match consumer {
ConsumerEndpoint::Actor(consumer) => {
let Some(producer_actor) = producer.actor_ref() else {
return Err(StreamError::ActorTerminated);
};
#[cfg(feature = "cluster")]
if is_remote_actor(consumer) {
let Some(producer_endpoint) = producer.endpoint() else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
consumer,
ConsumerCommand::ElementRemote {
producer: producer_endpoint,
seq,
item,
},
);
}
cast_actor(
consumer,
ConsumerCommand::Element {
producer: producer_actor,
seq,
item,
},
)
}
ConsumerEndpoint::Direct(consumer) => {
let Some(consumer) = consumer.upgrade() else {
return Err(StreamError::Cancelled);
};
let Some(producer) = producer.direct_shared() else {
return Err(StreamError::ActorTerminated);
};
consumer.push_direct(&producer, seq, item);
Ok(())
}
}
}
#[allow(dead_code)]
fn fail_consumer<T>(
consumer: &ConsumerEndpoint<T>,
producer: &ProducerEndpointHandle<T>,
error: StreamError,
) -> StreamResult<bool>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
match consumer {
ConsumerEndpoint::Actor(consumer) => {
let Some(producer_actor) = producer.actor_ref() else {
return Err(StreamError::ActorTerminated);
};
#[cfg(feature = "cluster")]
if is_remote_actor(consumer) {
let Some(producer_endpoint) = producer.endpoint() else {
return Err(StreamError::ActorTerminated);
};
cast_actor(
consumer,
ConsumerCommand::FailureRemote {
producer: producer_endpoint,
error,
},
)?;
return Ok(false);
}
cast_actor(
consumer,
ConsumerCommand::Failure {
producer: producer_actor,
error,
},
)?;
Ok(false)
}
ConsumerEndpoint::Direct(consumer) => {
let Some(consumer) = consumer.upgrade() else {
return Err(StreamError::Cancelled);
};
let Some(producer) = producer.direct_shared() else {
return Err(StreamError::ActorTerminated);
};
consumer.fail_direct(&producer, error);
Ok(true)
}
}
}
#[allow(dead_code)]
fn complete_consumer<T>(
consumer: &ConsumerEndpoint<T>,
producer: &ProducerEndpointHandle<T>,
seq: u64,
) -> StreamResult<bool>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
match consumer {
ConsumerEndpoint::Actor(consumer) => {
let Some(producer_actor) = producer.actor_ref() else {
return Err(StreamError::ActorTerminated);
};
#[cfg(feature = "cluster")]
if is_remote_actor(consumer) {
let Some(producer_endpoint) = producer.endpoint() else {
return Err(StreamError::ActorTerminated);
};
cast_actor(
consumer,
ConsumerCommand::CompleteRemote {
producer: producer_endpoint,
seq,
},
)?;
return Ok(false);
}
cast_actor(
consumer,
ConsumerCommand::Complete {
producer: producer_actor,
seq,
},
)?;
Ok(false)
}
ConsumerEndpoint::Direct(consumer) => {
let Some(consumer) = consumer.upgrade() else {
return Err(StreamError::Cancelled);
};
let Some(producer) = producer.direct_shared() else {
return Err(StreamError::ActorTerminated);
};
consumer.complete_direct(&producer, seq);
Ok(true)
}
}
}
#[allow(dead_code)]
fn run_producer_endpoint<T>(
mut input: BoxStream<T>,
shared: Arc<ProducerShared<T>>,
producer_endpoint: ProducerEndpointHandle<T>,
settings: StreamRefSettings,
cancelled: Arc<AtomicBool>,
) -> StreamResult<NotUsed>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
let deadline = Instant::now()
.checked_add(settings.subscription_timeout)
.unwrap_or_else(far_future);
loop {
let consumer = match wait_for_remote_demand(&shared, deadline, &cancelled) {
Ok(consumer) => consumer,
Err(error) => {
producer_endpoint.stop_actor();
return Err(error);
}
};
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
match input.next() {
Some(Ok(item)) => {
let seq = {
let mut inner = shared.lock();
let seq = inner.sent;
inner.sent = inner.sent.saturating_add(1);
seq
};
if let Err(error) =
send_element_to_consumer(&consumer, &producer_endpoint, seq, item)
{
producer_endpoint.stop_actor();
return Err(match error {
StreamError::ActorTerminated => StreamError::Cancelled,
other => other,
});
}
}
Some(Err(error)) => {
if fail_consumer(&consumer, &producer_endpoint, error.clone()).unwrap_or(false) {
producer_endpoint.stop_actor();
}
return Err(error);
}
None => {
let seq = shared.lock().sent;
if complete_consumer(&consumer, &producer_endpoint, seq).unwrap_or(false) {
producer_endpoint.stop_actor();
}
return Ok(NotUsed);
}
}
}
}
#[allow(dead_code)]
fn wait_for_remote_demand<T>(
shared: &Arc<ProducerShared<T>>,
deadline: Instant,
cancelled: &Arc<AtomicBool>,
) -> StreamResult<ConsumerEndpoint<T>>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
let mut inner = shared.lock();
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(StreamError::Cancelled);
}
if let Some(error) = inner.stopped.clone() {
return Err(error);
}
if let Some(consumer) = &inner.consumer
&& inner.sent < inner.cumulative_demand
{
return Ok(consumer.clone());
}
let now = Instant::now();
if inner.consumer.is_none() && now >= deadline {
return Err(subscription_timeout_error("stream ref sink"));
}
let remaining = deadline.saturating_duration_since(now);
let timeout = if inner.consumer.is_none() {
remaining.min(STREAM_REF_WAIT_POLL)
} else {
STREAM_REF_WAIT_POLL
};
let (next, _) = wait_timeout_unpoison(&shared.changed, inner, timeout);
inner = next;
}
}
#[allow(dead_code)]
fn spawn_producer_actor<T>(
initial_consumer: Option<ActorRef<ConsumerCommand<T>>>,
shared: Arc<ProducerShared<T>>,
settings: StreamRefSettings,
#[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
) -> StreamResult<(
ActorRef<ProducerCommand<T>>,
ractor::concurrency::JoinHandle<()>,
)>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
block_on_ractor_runtime(Actor::spawn(
None,
ProducerActor {
shared,
initial_consumer,
settings,
#[cfg(feature = "cluster")]
endpoint,
},
(),
))?
.map_err(|error| {
StreamError::Failed(format!(
"stream ref producer actor failed to spawn: {error}"
))
})
}
#[allow(dead_code)]
fn spawn_consumer_actor<T>(
initial_producer: Option<ActorRef<ProducerCommand<T>>>,
shared: Arc<ConsumerShared<T>>,
settings: StreamRefSettings,
#[cfg(feature = "cluster")] endpoint: Option<RemoteStreamRefEndpoint>,
) -> StreamResult<(
ActorRef<ConsumerCommand<T>>,
ractor::concurrency::JoinHandle<()>,
)>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
block_on_ractor_runtime(Actor::spawn(
None,
ConsumerActor {
shared,
initial_producer,
settings,
#[cfg(feature = "cluster")]
endpoint,
},
(),
))?
.map_err(|error| {
StreamError::Failed(format!(
"stream ref consumer actor failed to spawn: {error}"
))
})
}
#[allow(dead_code)]
fn cast_producer_subscribe<T>(
producer: &ActorRef<ProducerCommand<T>>,
consumer: ActorRef<ConsumerCommand<T>>,
#[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(producer) {
let Some(consumer_endpoint) = consumer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
producer,
ProducerCommand::SubscribeRemote {
consumer: consumer_endpoint.clone(),
},
);
}
cast_actor(producer, ProducerCommand::Subscribe { consumer })
}
#[allow(dead_code)]
fn cast_producer_demand<T>(
producer: &ActorRef<ProducerCommand<T>>,
consumer: ActorRef<ConsumerCommand<T>>,
#[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
cumulative: u64,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(producer) {
let Some(consumer_endpoint) = consumer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
producer,
ProducerCommand::DemandRemote {
consumer: consumer_endpoint.clone(),
cumulative,
},
);
}
cast_actor(
producer,
ProducerCommand::Demand {
consumer,
cumulative,
},
)
}
#[allow(dead_code)]
fn cast_producer_cancel<T>(
producer: &ActorRef<ProducerCommand<T>>,
consumer: ActorRef<ConsumerCommand<T>>,
#[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(producer) {
let Some(consumer_endpoint) = consumer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
producer,
ProducerCommand::CancelRemote {
consumer: consumer_endpoint.clone(),
},
);
}
cast_actor(producer, ProducerCommand::Cancel { consumer })
}
#[allow(dead_code)]
fn cast_producer_remote_failure<T>(
producer: &ActorRef<ProducerCommand<T>>,
consumer: ActorRef<ConsumerCommand<T>>,
#[cfg(feature = "cluster")] consumer_endpoint: Option<&RemoteStreamRefEndpoint>,
error: StreamError,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(producer) {
let Some(consumer_endpoint) = consumer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
producer,
ProducerCommand::RemoteFailureRemote {
consumer: consumer_endpoint.clone(),
error,
},
);
}
cast_actor(producer, ProducerCommand::RemoteFailure { consumer, error })
}
#[allow(dead_code)]
fn cast_consumer_on_subscribe<T>(
consumer: &ActorRef<ConsumerCommand<T>>,
producer: ActorRef<ProducerCommand<T>>,
#[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(consumer) {
let Some(producer_endpoint) = producer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
consumer,
ConsumerCommand::OnSubscribeRemote {
producer: producer_endpoint.clone(),
},
);
}
cast_actor(consumer, ConsumerCommand::OnSubscribe { producer })
}
#[allow(dead_code)]
fn cast_consumer_failure<T>(
consumer: &ActorRef<ConsumerCommand<T>>,
producer: ActorRef<ProducerCommand<T>>,
#[cfg(feature = "cluster")] producer_endpoint: Option<&RemoteStreamRefEndpoint>,
error: StreamError,
) -> StreamResult<()>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
#[cfg(feature = "cluster")]
if is_remote_actor(consumer) {
let Some(producer_endpoint) = producer_endpoint else {
return Err(StreamError::ActorTerminated);
};
return cast_actor(
consumer,
ConsumerCommand::FailureRemote {
producer: producer_endpoint.clone(),
error,
},
);
}
cast_actor(consumer, ConsumerCommand::Failure { producer, error })
}
#[allow(dead_code)]
fn cast_actor<Msg>(actor_ref: &ActorRef<Msg>, message: Msg) -> StreamResult<()>
where
Msg: Message,
{
match actor_ref.cast(message) {
Ok(()) => Ok(()),
Err(ractor::MessagingErr::SendErr(_)) | Err(ractor::MessagingErr::ChannelClosed) => {
Err(StreamError::ActorTerminated)
}
Err(error) => Err(StreamError::ActorAskSendFailed {
reason: error.to_string(),
}),
}
}
#[cfg(feature = "cluster")]
fn is_remote_actor<Msg>(actor_ref: &ActorRef<Msg>) -> bool {
!actor_ref.get_id().is_local()
}
#[cfg(feature = "cluster")]
fn join_remote_endpoint<Msg>(endpoint: &RemoteStreamRefEndpoint, actor_ref: &ActorRef<Msg>) {
ractor::pg::join_scoped(
endpoint.scope.clone(),
endpoint.group.clone(),
vec![actor_ref.get_cell()],
);
}
#[cfg(feature = "cluster")]
fn resolve_remote_actor<Msg>(
endpoint: &RemoteStreamRefEndpoint,
timeout: Duration,
) -> StreamResult<ActorRef<Msg>>
where
Msg: Message,
{
let deadline = Instant::now()
.checked_add(timeout)
.unwrap_or_else(far_future);
loop {
if let Some(actor) = resolve_remote_actor_once(endpoint) {
return Ok(actor);
}
if Instant::now() >= deadline {
return Err(subscription_timeout_error("stream ref endpoint"));
}
std::thread::park_timeout(STREAM_REF_WAIT_POLL);
}
}
#[cfg(feature = "cluster")]
fn resolve_remote_actor_once<Msg>(endpoint: &RemoteStreamRefEndpoint) -> Option<ActorRef<Msg>>
where
Msg: Message,
{
ractor::pg::get_scoped_members(&endpoint.scope, &endpoint.group)
.into_iter()
.next()
.map(ActorCell::into)
}
#[allow(dead_code)]
fn same_actor<MsgA, MsgB>(left: &ActorRef<MsgA>, right: &ActorRef<MsgB>) -> bool
where
MsgA: Message,
MsgB: Message,
{
left.get_cell().get_id() == right.get_cell().get_id()
}
fn failed_once<T>(reason: &str) -> BoxStream<T>
where
T: stream_ref_actor_bound::Bound + Send + 'static,
{
failed_stream(StreamError::Failed(reason.to_owned()))
}
fn failed_stream<T>(error: StreamError) -> BoxStream<T>
where
T: Send + 'static,
{
Box::new(std::iter::once(Err(error)))
}
fn subscription_timeout_error(side: &str) -> StreamError {
StreamError::Failed(format!(
"{side} remote side did not subscribe within subscription timeout"
))
}
#[allow(dead_code)]
fn invalid_sequence_error(expected: u64, got: u64, context: &str) -> StreamError {
StreamError::Failed(format!(
"{context}: expected sequence {expected}, got {got}"
))
}
#[allow(dead_code)]
fn next_redelivery_deadline(settings: StreamRefSettings) -> Instant {
Instant::now()
.checked_add(settings.demand_redelivery_interval)
.unwrap_or_else(far_future)
}
fn far_future() -> Instant {
Instant::now() + Duration::from_secs(60 * 60 * 24 * 365)
}
fn wait_timeout_unpoison<'a, T>(
condvar: &Condvar,
guard: MutexGuard<'a, T>,
timeout: Duration,
) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
condvar
.wait_timeout(guard, timeout)
.unwrap_or_else(|poison| poison.into_inner())
}
#[cfg(feature = "cluster")]
fn put_u8(bytes: &mut Vec<u8>, value: u8) {
bytes.push(value);
}
#[cfg(feature = "cluster")]
fn take_u8(bytes: &[u8], cursor: &mut usize) -> u8 {
let value = bytes.get(*cursor).copied().unwrap_or_default();
*cursor = (*cursor).saturating_add(1);
value
}
#[cfg(feature = "cluster")]
fn put_u32(bytes: &mut Vec<u8>, value: u32) {
bytes.extend(value.to_be_bytes());
}
#[cfg(feature = "cluster")]
fn take_u32(bytes: &[u8], cursor: &mut usize) -> u32 {
let mut value = [0_u8; 4];
if let Some(slice) = take_exact(bytes, cursor, value.len()) {
value.copy_from_slice(slice);
}
u32::from_be_bytes(value)
}
#[cfg(feature = "cluster")]
fn put_u64(bytes: &mut Vec<u8>, value: u64) {
bytes.extend(value.to_be_bytes());
}
#[cfg(feature = "cluster")]
fn take_u64(bytes: &[u8], cursor: &mut usize) -> u64 {
let mut value = [0_u8; 8];
if let Some(slice) = take_exact(bytes, cursor, value.len()) {
value.copy_from_slice(slice);
}
u64::from_be_bytes(value)
}
#[cfg(feature = "cluster")]
fn put_bytes(bytes: &mut Vec<u8>, value: Vec<u8>) {
put_u64(bytes, value.len() as u64);
bytes.extend(value);
}
#[cfg(feature = "cluster")]
fn remote_cast(variant: &str, fields: Vec<Vec<u8>>) -> ractor::message::SerializedMessage {
let mut args = Vec::new();
for field in fields {
put_bytes(&mut args, field);
}
ractor::message::SerializedMessage::Cast {
variant: variant.to_owned(),
args,
metadata: None,
}
}
#[cfg(feature = "cluster")]
fn take_bytes(bytes: &[u8], cursor: &mut usize) -> Vec<u8> {
let len = take_u64(bytes, cursor) as usize;
take_exact(bytes, cursor, len).unwrap_or_default().to_vec()
}
#[cfg(feature = "cluster")]
fn put_string(bytes: &mut Vec<u8>, value: String) {
put_bytes(bytes, value.into_bytes());
}
#[cfg(feature = "cluster")]
fn take_string(bytes: &[u8], cursor: &mut usize) -> String {
String::from_utf8(take_bytes(bytes, cursor)).unwrap_or_default()
}
#[cfg(feature = "cluster")]
fn put_duration(bytes: &mut Vec<u8>, value: Duration) {
put_u64(bytes, value.as_secs());
put_u32(bytes, value.subsec_nanos());
}
#[cfg(feature = "cluster")]
fn take_duration(bytes: &[u8], cursor: &mut usize) -> Duration {
Duration::new(take_u64(bytes, cursor), take_u32(bytes, cursor))
}
#[cfg(feature = "cluster")]
fn take_exact<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> Option<&'a [u8]> {
let end = cursor.checked_add(len)?;
let value = bytes.get(*cursor..end)?;
*cursor = end;
Some(value)
}
#[cfg(feature = "cluster")]
fn remote_port_operation(value: String) -> &'static str {
match value.as_str() {
"abort_emitting" => "abort_emitting",
"abort_reading" => "abort_reading",
"complete" => "complete",
"fail" => "fail",
"grab" => "grab",
"offer" => "offer",
"pull" => "pull",
"push" => "push",
"read_n" => "read_n",
"request" => "request",
"set_handler" => "set_handler",
"set_out_handler" => "set_out_handler",
_ => "remote",
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
stream::{Keep, Source},
testkit::TestSink,
};
use std::sync::{
Arc as StdArc,
atomic::{AtomicBool, AtomicUsize, Ordering},
};
fn wait_until(timeout: Duration, mut condition: impl FnMut() -> bool) -> bool {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if condition() {
return true;
}
std::thread::park_timeout(Duration::from_millis(1));
}
condition()
}
fn assert_condition_holds(timeout: Duration, mut condition: impl FnMut() -> bool) {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
assert!(condition());
std::thread::park_timeout(Duration::from_millis(1));
}
assert!(condition());
}
fn short_settings() -> StreamRefSettings {
StreamRefSettings::default()
.with_buffer_capacity(1)
.with_subscription_timeout(Duration::from_millis(50))
.with_demand_redelivery_interval(Duration::from_millis(10))
}
#[test]
fn source_ref_streams_elements_and_completion() {
let source_ref = Source::from_iter(1_u64..=3)
.run_with(StreamRefs::source_ref())
.unwrap();
assert_eq!(source_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
}
#[test]
fn sink_ref_streams_elements_and_completion() {
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
Source::from_iter(1_u64..=3)
.run_with(sink_ref.sink())
.unwrap()
.wait()
.unwrap();
assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
}
#[test]
fn source_ref_propagates_upstream_failure() {
let source_ref = Source::<u64>::failed(StreamError::Failed("boom".to_owned()))
.run_with(StreamRefs::source_ref())
.unwrap();
assert_eq!(
source_ref.source().run_collect(),
Err(StreamError::Failed("boom".to_owned()))
);
}
#[test]
fn sink_ref_propagates_upstream_failure() {
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let failure = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
.run_with(sink_ref.sink())
.unwrap()
.wait();
assert_eq!(failure, Err(StreamError::Failed("remote boom".to_owned())));
assert_eq!(
completion.wait(),
Err(StreamError::Failed("remote boom".to_owned()))
);
}
#[test]
fn source_ref_cancellation_reaches_origin() {
let closed = StdArc::new(AtomicBool::new(false));
let close_flag = StdArc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(()),
|_state| Ok(Some(1_u64)),
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
assert_eq!(source_ref.source().take(1).run_collect().unwrap(), vec![1]);
assert!(wait_until(Duration::from_secs(1), || {
closed.load(Ordering::SeqCst)
}));
}
#[test]
fn sink_ref_cancellation_stops_remote_producer() {
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.take(1)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let producer = Source::repeat(1_u64)
.run_with(sink_ref.sink())
.unwrap()
.wait();
assert_eq!(completion.wait().unwrap(), vec![1]);
assert_eq!(producer, Err(StreamError::Cancelled));
}
#[test]
fn source_ref_backpressures_across_ref() {
let pulled = StdArc::new(AtomicUsize::new(0));
let pulled_for_source = StdArc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let source_ref = source
.run_with(StreamRefs::source_ref_with_settings(short_settings()))
.unwrap();
let mut probe = source_ref.source().run_with(TestSink::probe()).unwrap();
probe.request(1);
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= 2
}));
assert_condition_holds(Duration::from_millis(50), || {
pulled.load(Ordering::SeqCst) <= 2
});
probe.request(1);
probe.assert_next(1);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= 3
}));
assert_condition_holds(Duration::from_millis(50), || {
pulled.load(Ordering::SeqCst) <= 3
});
probe.cancel();
}
#[test]
fn source_ref_late_subscription_observes_timeout() {
let source_ref = Source::repeat(1_u64)
.run_with(StreamRefs::source_ref_with_settings(short_settings()))
.unwrap();
assert!(wait_until(Duration::from_secs(1), || {
source_ref.inner.producer.get_status() == ractor::ActorStatus::Stopped
}));
let result = source_ref.source().run_collect();
assert!(matches!(
result,
Err(StreamError::ActorTerminated) | Err(StreamError::Failed(_))
));
}
#[test]
fn sink_ref_subscription_timeout_fails_local_source() {
let (_sink_ref, probe) = StreamRefs::sink_ref_with_settings::<u64>(short_settings())
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
probe.request(1);
let error = probe.expect_error();
assert!(
matches!(error, StreamError::Failed(message) if message.contains("did not subscribe"))
);
}
#[test]
fn stream_refs_are_one_shot() {
let source_ref = Source::from_iter([1_u64])
.run_with(StreamRefs::source_ref())
.unwrap();
assert_eq!(source_ref.source().run_collect().unwrap(), vec![1]);
assert!(matches!(
source_ref.source().run_collect(),
Err(StreamError::Failed(message)) if message.contains("already")
));
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
Source::single(1_u64)
.run_with(sink_ref.sink())
.unwrap()
.wait()
.unwrap();
assert!(matches!(
Source::single(2_u64).run_with(sink_ref.sink()).unwrap().wait(),
Err(StreamError::Failed(message)) if message.contains("already")
));
assert_eq!(completion.wait().unwrap(), vec![1]);
}
#[cfg(feature = "cluster")]
fn serialize_source_ref<T>(source_ref: SourceRef<T>) -> SourceRef<T>
where
T: BytesConvertable + Send + 'static,
{
SourceRef::from_bytes(source_ref.into_bytes())
}
#[cfg(feature = "cluster")]
fn serialize_sink_ref<T>(sink_ref: SinkRef<T>) -> SinkRef<T>
where
T: BytesConvertable + Send + 'static,
{
SinkRef::from_bytes(sink_ref.into_bytes())
}
#[cfg(feature = "cluster")]
#[test]
fn remote_source_ref_serialized_path_streams_elements_and_completion() {
let source_ref = Source::from_iter(1_u64..=3)
.run_with(StreamRefs::source_ref())
.unwrap();
let remote_ref = serialize_source_ref(source_ref);
assert_eq!(remote_ref.source().run_collect().unwrap(), vec![1, 2, 3]);
}
#[cfg(feature = "cluster")]
#[test]
fn remote_sink_ref_serialized_path_streams_elements_and_completion() {
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let remote_ref = serialize_sink_ref(sink_ref);
Source::from_iter(1_u64..=3)
.run_with(remote_ref.sink())
.unwrap()
.wait()
.unwrap();
assert_eq!(completion.wait().unwrap(), vec![1, 2, 3]);
}
#[cfg(feature = "cluster")]
#[test]
fn remote_source_ref_serialized_path_propagates_failure() {
let source_ref = Source::<u64>::failed(StreamError::Failed("remote boom".to_owned()))
.run_with(StreamRefs::source_ref())
.unwrap();
let remote_ref = serialize_source_ref(source_ref);
assert_eq!(
remote_ref.source().run_collect(),
Err(StreamError::Failed("remote boom".to_owned()))
);
}
#[cfg(feature = "cluster")]
#[test]
fn remote_sink_ref_serialized_path_propagates_failure() {
let (sink_ref, completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let remote_ref = serialize_sink_ref(sink_ref);
let failure = Source::<u64>::failed(StreamError::Failed("sink boom".to_owned()))
.run_with(remote_ref.sink())
.unwrap()
.wait();
assert_eq!(failure, Err(StreamError::Failed("sink boom".to_owned())));
assert_eq!(
completion.wait(),
Err(StreamError::Failed("sink boom".to_owned()))
);
}
#[cfg(feature = "cluster")]
#[test]
fn remote_source_ref_serialized_path_cancellation_reaches_origin() {
let closed = StdArc::new(AtomicBool::new(false));
let close_flag = StdArc::clone(&closed);
let source = Source::unfold_resource(
|| Ok(()),
|_state| Ok(Some(1_u64)),
move |_state| {
close_flag.store(true, Ordering::SeqCst);
Ok(())
},
);
let source_ref = source.run_with(StreamRefs::source_ref()).unwrap();
let remote_ref = serialize_source_ref(source_ref);
assert_eq!(remote_ref.source().take(1).run_collect().unwrap(), vec![1]);
assert!(wait_until(Duration::from_secs(1), || {
closed.load(Ordering::SeqCst)
}));
}
#[cfg(feature = "cluster")]
#[test]
fn remote_source_ref_serialized_path_backpressures_across_ref() {
let pulled = StdArc::new(AtomicUsize::new(0));
let pulled_for_source = StdArc::clone(&pulled);
let source = Source::unfold(0_u64, move |next| {
pulled_for_source.fetch_add(1, Ordering::SeqCst);
Some((next + 1, next))
});
let source_ref = source
.run_with(StreamRefs::source_ref_with_settings(short_settings()))
.unwrap();
let remote_ref = serialize_source_ref(source_ref);
let mut probe = remote_ref.source().run_with(TestSink::probe()).unwrap();
probe.request(1);
probe.assert_next(0);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= 1
}));
assert_condition_holds(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) <= 2
});
probe.request(1);
probe.assert_next(1);
assert!(wait_until(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) >= 2
}));
assert_condition_holds(Duration::from_secs(1), || {
pulled.load(Ordering::SeqCst) <= 3
});
probe.cancel();
}
#[cfg(feature = "cluster")]
#[test]
fn remote_sink_ref_serialized_path_producer_panic_surfaces_error() {
let (sink_ref, _completion) = StreamRefs::sink_ref::<u64>()
.to_mat(Sink::head(), Keep::both)
.run()
.unwrap();
let remote_sink = serialize_sink_ref(sink_ref);
let panicking = Source::unfold(0_u64, move |state| {
if state >= 5 {
panic!("intentional test panic in producer thread");
}
Some((state + 1, state))
});
let mut completion = panicking
.run_with(remote_sink.sink())
.expect("sink ref materializes");
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if let Some(result) = completion.try_wait() {
assert!(
result.is_err(),
"panic in producer thread should surface error to consumer, got {:?}",
result
);
break;
}
assert!(
Instant::now() < deadline,
"consumer hung after producer panic (no result after 5s)"
);
std::thread::park_timeout(Duration::from_millis(10));
}
}
}