1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
use crate::connector::hook::IntoHook;
use crate::connector::{RuntimeCtx, Stream, StreamDescriptor, StreamRunner};
use crate::io::base::{BaseTx, TxPairExt};
use crate::utils::*;
use std::fmt::Debug;
use std::{sync::Arc, thread};
use uuid::Uuid;
use uuid::fmt::Simple;
#[derive(Debug)]
pub enum EventTxType<E> {
External(E),
Own,
}
impl<E> EventTxType<E> {
#[inline]
pub fn external(tx: E) -> Self {
Self::External(tx)
}
}
impl<E> Default for EventTxType<E> {
fn default() -> Self {
Self::Own
}
}
/// Blanket helper for spawning typed streams from a connector.
///
/// A `StreamSpawner` is implemented on top of a [`StreamRunner`].
/// It allocates channels, state, health flag, applies core pinning policy,
/// and starts the worker thread. The runner then executes its loop
/// inside that thread with a [`RuntimeCtx`].
pub trait StreamSpawner<D, E, R, S, T>
where
Self: StreamRunner<D, E, R, S, T>,
D: StreamDescriptor<T>,
S: StateMarker,
E: BaseTx + TxPairExt,
R: Reducer,
T: Debug + Clone + Send + 'static,
{
/// Build a human-readable thread name from descriptor and UUID.
fn thread_name(&self, desc: &D, id: Simple) -> String {
format!("{}-{}-{}", desc.venue(), desc.kind(), id)
}
/// Spawn a new worker stream given a descriptor and event hook.
///
/// - Builds config via [`StreamRunner::build_config`].
/// - Allocates action/event channels (bounded/unbounded from descriptor).
/// - Creates state cell and health flag.
/// - Applies core pinning policy if provided.
/// - Spawns the worker thread running [`StreamRunner::run`].
/// - Returns a [`Stream`] handle with action TX, optional event RX, state, cancel, and health.
fn spawn<H>(
&mut self,
desc: D,
event_tx_type: EventTxType<E>,
hook: H,
cancel: CancelToken,
core_stats: Option<Arc<CoreStats>>,
) -> anyhow::Result<Stream<Self::ActionTx, Option<E::RxHalf>, S>>
where
H: IntoHook<Self::RawEvent, E, R, S, D, Self::HookResult, T>,
{
// Per-stream config
let ctx = match self.build_config(&desc) {
Ok(ctx) => ctx,
Err(err) => {
tracing::error!(
"[{}-{}] failed to build stream config: {}",
desc.venue(),
desc.kind(),
err
);
return Err(err);
}
};
// Shared state + health
let state = StateCell::<S>::new_default();
let health = HealthFlag::new(desc.health_at_start());
// Channels: actions (model->worker)
let (action_tx, action_rx) = if let Some(bound) = desc.max_pending_actions() {
<Self::ActionTx as TxPairExt>::bound(bound)
} else {
<Self::ActionTx as TxPairExt>::unbound()
};
// Channels: events (worker->model)
let (event_tx, event_rx) = match event_tx_type {
EventTxType::Own => {
if let Some(bound) = desc.max_pending_events() {
let (tx, rx) = E::bound(bound);
(tx, Some(rx))
} else {
let (tx, rx) = E::unbound();
(tx, Some(rx))
}
}
EventTxType::External(tx) => (tx, None),
};
let reducer = R::default();
// Unique stream id
let stream_id = Uuid::new_v4().simple();
// Spawn worker thread
let handle = thread::Builder::new()
.name(self.thread_name(&desc, stream_id))
.spawn({
let rt_ctx = RuntimeCtx::new(
ctx,
desc,
action_rx,
event_tx,
reducer,
state.clone(),
cancel.clone(),
health.clone(),
);
let health = health.clone();
move || {
let mut _core_lease = None;
// Apply core pinning policy if any
if let Some(policy) = rt_ctx.desc.core_pick_policy() {
match (core_stats, policy.specific()) {
// Reserve/pin via CoreStats
(Some(cs), _) => {
let lease = cs.reserve(policy);
match try_pin_core(lease.core_id) {
Ok(core_id) => {
tracing::info!("pinned core {} successfully", core_id);
_core_lease = Some(lease); // keep guard alive
}
Err(err) => {
tracing::error!(
"failed to pin core {}: {}",
lease.core_id,
err
);
}
}
}
// Specific core requested but no CoreStats available
(None, Some(core_id)) => {
tracing::warn!(
"core pinning policy is set to specific, but no core stats available"
);
match try_pin_core(core_id) {
Ok(core_id) => {
tracing::info!("pinned core {} successfully", core_id);
}
Err(err) => {
tracing::error!(
"failed to pin core {}: {}",
core_id,
err
);
}
}
}
// Policy requires stats but none provided
(None, None) => {
tracing::warn!(
"core pinning policy requires core stats, but none are available"
);
}
}
}
// Run the worker
let res = Self::run(rt_ctx, hook);
health.down();
if let Err(err) = &res {
tracing::error!("stream {} failed: {}", stream_id, err);
}
res
}
})?;
// Wrap into a `Stream` handle
let stream = Stream::<Self::ActionTx, Option<E::RxHalf>, S>::new(
stream_id, handle, health, action_tx, event_rx, state, cancel,
);
Ok(stream)
}
}