calimero-node 0.10.1-rc.37

Core Calimero infrastructure and tools
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Sync-session dispatch actor.
//!
//! Moves HashComparison/LevelWise sync sessions (both initiator and
//! responder sides) off the `NodeManager` arbiter and the
//! `SyncManager::start` select loop onto a dedicated `SyncSessionActor`
//! running on its own Arbiter (issue #2316, follow-up to #2299/#2293).
//!
//! ## Why this exists
//!
//! Pre-#2316 the responder ran on the `NodeManager` arbiter
//! (via `ctx.spawn` in `handlers/stream_opened.rs`) and the initiator
//! ran inline inside `SyncManager::start`'s `FuturesUnordered`. A
//! single slow session (#2199 makes 5–10s sessions plausible under
//! fuzzy load) blocked the same task that drives gossipsub
//! `Swarm::poll`, draining the libp2p stream-accept channel and
//! letting mesh peers prune the busy node — exactly the failure
//! described in #2293.
//!
//! ## Backpressure
//!
//! Bounded Actix mailbox via `set_mailbox_capacity`; `Addr::try_send`
//! returns `SendError::Full` on overflow. On overflow the dispatch
//! site logs the drop; the existing periodic-sync interval and
//! heartbeat-driven sync triggers cover dropped initiators, and
//! peers will retry dropped responder streams via their own retry
//! logic.
//!
//! ## Mirrors `state_delta_bridge`
//!
//! Same shape as `state_delta_bridge::StateDeltaActor`: dedicated
//! Arbiter, bounded mailbox, `try_send`, `InFlightGuard`,
//! per-session `tokio::time::timeout`, and counters for
//! processed/error/timeout/dropped logged once a minute.

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

use actix::{
    Actor, ActorFutureExt, Addr, ArbiterHandle, AsyncContext, Context, Handler, Message, WrapFuture,
};
use calimero_network_primitives::stream::Stream;
use calimero_primitives::context::ContextId;
use libp2p::PeerId;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info, warn};

use calimero_node_primitives::sync::SyncProtocol;

use crate::sync::SyncManager;

/// Mailbox capacity for the sync-session actor.
///
/// Sync sessions are heavier than state-delta jobs but much less
/// frequent (a few per second per context vs. many per second). 256
/// covers >30s of bursts at the typical rate before drops; on
/// overflow we drop and rely on the periodic-sync interval to retry.
pub const SYNC_SESSION_CHANNEL_CAPACITY: usize = 256;

/// Periodic summary log interval.
const SUMMARY_INTERVAL: Duration = Duration::from_secs(60);

/// Result reported back to `SyncManager::start` for state tracking
/// (success-count, backoff). Mirrors the tuple the legacy
/// `FuturesUnordered`-based loop produced before #2316.
#[derive(Debug)]
pub struct SyncSessionResult {
    pub context_id: ContextId,
    pub peer_id: PeerId,
    pub took: Duration,
    /// `Ok(Ok(_))` = sync ran to completion; `Ok(Err(_))` = sync
    /// returned an error; `Err(_)` = session timed out.
    pub result: Result<Result<SyncProtocol, eyre::Error>, tokio::time::error::Elapsed>,
}

/// RAII guard that decrements [`SyncSessionActor::in_flight`] on
/// drop, including panic unwinds. Same pattern as
/// `state_delta_bridge::InFlightGuard`.
struct InFlightGuard {
    counter: Arc<AtomicU64>,
}

impl InFlightGuard {
    fn new(counter: Arc<AtomicU64>) -> Self {
        let _prev = counter.fetch_add(1, Ordering::Relaxed);
        Self { counter }
    }
}

impl Drop for InFlightGuard {
    fn drop(&mut self) {
        let _prev = self.counter.fetch_sub(1, Ordering::Relaxed);
    }
}

/// One unit of work routed to [`SyncSessionActor`].
#[derive(Message)]
#[rtype(result = "()")]
pub enum SyncSessionJob {
    /// Inbound sync stream from a peer; runs `handle_opened_stream`
    /// (which dispatches to the appropriate responder).
    Responder {
        peer_id: PeerId,
        stream: Box<Stream>,
    },
    /// Locally-driven sync attempt; runs `perform_interval_sync`.
    /// `peer_id = None` lets the manager choose a peer.
    Initiator {
        context_id: ContextId,
        peer_id: Option<PeerId>,
    },
}

/// Sender side. Wraps `Addr<SyncSessionActor>` so dispatch sites can
/// `try_send` without depending on Actix types directly.
#[derive(Clone, Debug)]
pub struct SyncSessionSender {
    addr: Addr<SyncSessionActor>,
    dropped_total: Arc<AtomicU64>,
}

/// Error returned by [`SyncSessionSender::try_send`].
#[derive(Debug)]
pub enum SyncSessionSendError {
    /// Mailbox at capacity; drop and rely on periodic-sync retry.
    Full,
    /// Actor stopped — bridge is shutting down or has crashed.
    Closed,
}

impl SyncSessionSender {
    /// Non-blocking enqueue. Increments the drop counter on both
    /// `Full` and `Closed` so the periodic summary log doesn't
    /// undercount drops if the actor crashes or shuts down while the
    /// system is still running.
    pub fn try_send(&self, job: SyncSessionJob) -> Result<(), SyncSessionSendError> {
        match self.addr.try_send(job) {
            Ok(()) => Ok(()),
            Err(actix::dev::SendError::Full(_)) => {
                let _prev = self.dropped_total.fetch_add(1, Ordering::Relaxed);
                Err(SyncSessionSendError::Full)
            }
            Err(actix::dev::SendError::Closed(_)) => {
                let _prev = self.dropped_total.fetch_add(1, Ordering::Relaxed);
                Err(SyncSessionSendError::Closed)
            }
        }
    }
}

/// Sync-session dispatch actor. Runs on a dedicated Arbiter so a
/// long session (slow WASM merge-apply, divergent DAG) can't starve
/// the network/gossipsub task or the NodeManager mailbox.
pub struct SyncSessionActor {
    sync_manager: SyncManager,
    session_timeout: Duration,
    /// Caps concurrently-running sessions at `sync_config.max_concurrent`
    /// (default 30). The mailbox bounds *queued* jobs; this bounds
    /// *in-flight* jobs, restoring the limit the legacy
    /// `if futs.len() >= max_concurrent { advance().await }` check
    /// enforced before #2316. The acquire is unbounded — `acquire_owned`
    /// has no timeout — so the per-session `tokio::time::timeout` only
    /// applies to the work *after* the permit is held.
    concurrency: Arc<Semaphore>,
    /// Initiator results are forwarded here so `SyncManager::start`
    /// can update its per-context tracking state. `None` means
    /// results are dropped (e.g. in unit tests). Unbounded because a
    /// dropped result would leave the per-context `last_sync = None`
    /// forever (no `on_success`/`on_failure` would run), permanently
    /// stalling that context — same failure shape as the C1 dispatch
    /// stall fixed earlier in #2317.
    result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
    in_flight: Arc<AtomicU64>,
    processed_total: Arc<AtomicU64>,
    error_total: Arc<AtomicU64>,
    timeout_total: Arc<AtomicU64>,
    dropped_total: Arc<AtomicU64>,
}

impl SyncSessionActor {
    fn new(
        sync_manager: SyncManager,
        session_timeout: Duration,
        max_concurrent: usize,
        result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
        dropped_total: Arc<AtomicU64>,
    ) -> Self {
        Self {
            sync_manager,
            session_timeout,
            concurrency: Arc::new(Semaphore::new(max_concurrent)),
            result_tx,
            in_flight: Arc::new(AtomicU64::new(0)),
            processed_total: Arc::new(AtomicU64::new(0)),
            error_total: Arc::new(AtomicU64::new(0)),
            timeout_total: Arc::new(AtomicU64::new(0)),
            dropped_total,
        }
    }

    fn log_summary(&self) {
        let processed = self.processed_total.load(Ordering::Relaxed);
        let errors = self.error_total.load(Ordering::Relaxed);
        let timeouts = self.timeout_total.load(Ordering::Relaxed);
        let dropped = self.dropped_total.load(Ordering::Relaxed);
        let in_flight = self.in_flight.load(Ordering::Relaxed);
        info!(
            processed_total = processed,
            error_total = errors,
            timeout_total = timeouts,
            dropped_total = dropped,
            in_flight,
            "SyncSession actor summary"
        );
    }
}

impl Actor for SyncSessionActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        info!("SyncSession actor started on dedicated Arbiter");
        let _handle = ctx.run_interval(SUMMARY_INTERVAL, |actor, _ctx| {
            actor.log_summary();
        });
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        self.log_summary();
        info!("SyncSession actor stopped");
    }
}

impl Handler<SyncSessionJob> for SyncSessionActor {
    type Result = ();

    fn handle(&mut self, job: SyncSessionJob, ctx: &mut Self::Context) {
        let in_flight_guard = InFlightGuard::new(Arc::clone(&self.in_flight));

        let session_timeout = self.session_timeout;
        let sync_manager = self.sync_manager.clone();
        let result_tx = self.result_tx.clone();
        let concurrency = Arc::clone(&self.concurrency);

        match job {
            SyncSessionJob::Responder { peer_id, stream } => {
                // Responder: `handle_opened_stream` returns `()` so
                // there is no `error_total` distinction here — only
                // `processed_total` and `timeout_total`.
                let processed_total = Arc::clone(&self.processed_total);
                let timeout_total = Arc::clone(&self.timeout_total);
                let work = async move {
                    let _guard = in_flight_guard;
                    // Bound concurrent in-flight sessions to
                    // `sync_config.max_concurrent` (default 30); when
                    // saturated, queued jobs wait here rather than
                    // running unbounded. The session timeout below
                    // still applies once the permit is held.
                    let _permit = concurrency.acquire_owned().await.ok();
                    let started = Instant::now();
                    let outcome = tokio::time::timeout(
                        session_timeout,
                        sync_manager.handle_opened_stream(peer_id, stream),
                    )
                    .await;
                    match &outcome {
                        Ok(()) => {
                            let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
                        }
                        Err(_elapsed) => {
                            let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
                        }
                    }
                    (outcome, started)
                };

                let _spawn_handle = ctx.spawn(work.into_actor(self).map(
                    move |(outcome, started), _act, _ctx| match outcome {
                        Ok(()) => {
                            debug!(
                                %peer_id,
                                elapsed_ms = started.elapsed().as_millis(),
                                "SyncSession responder completed"
                            );
                        }
                        Err(_elapsed) => {
                            warn!(
                                %peer_id,
                                timeout_secs = session_timeout.as_secs(),
                                elapsed_ms = started.elapsed().as_millis(),
                                "SyncSession responder exceeded timeout — dropping; peer will retry"
                            );
                        }
                    },
                ));
            }
            SyncSessionJob::Initiator {
                context_id,
                peer_id,
            } => {
                let processed_total = Arc::clone(&self.processed_total);
                let error_total = Arc::clone(&self.error_total);
                let timeout_total = Arc::clone(&self.timeout_total);
                let work = async move {
                    let _guard = in_flight_guard;
                    let _permit = concurrency.acquire_owned().await.ok();
                    let started = Instant::now();
                    let outcome = tokio::time::timeout(
                        session_timeout,
                        sync_manager.perform_interval_sync(context_id, peer_id),
                    )
                    .await;

                    let chosen_peer = outcome
                        .as_ref()
                        .ok()
                        .and_then(|r| r.as_ref().ok())
                        .map(|(p, _)| *p)
                        .or(peer_id)
                        .unwrap_or_else(PeerId::random);

                    match &outcome {
                        Ok(Ok(_)) => {
                            let _prev = processed_total.fetch_add(1, Ordering::Relaxed);
                        }
                        Ok(Err(_)) => {
                            let _prev = error_total.fetch_add(1, Ordering::Relaxed);
                        }
                        Err(_elapsed) => {
                            let _prev = timeout_total.fetch_add(1, Ordering::Relaxed);
                        }
                    }

                    let took = started.elapsed();
                    let result = outcome.map(|r| r.map(|(_, proto)| proto));
                    (result, took, chosen_peer)
                };

                let _spawn_handle = ctx.spawn(work.into_actor(self).map(
                    move |(result, took, chosen_peer), _act, _ctx| {
                        match &result {
                            Ok(Ok(_)) => debug!(
                                %context_id,
                                %chosen_peer,
                                took_ms = took.as_millis(),
                                "SyncSession initiator completed"
                            ),
                            Ok(Err(err)) => debug!(
                                %context_id,
                                %chosen_peer,
                                took_ms = took.as_millis(),
                                error = %err,
                                "SyncSession initiator failed"
                            ),
                            Err(_elapsed) => warn!(
                                %context_id,
                                %chosen_peer,
                                took_ms = took.as_millis(),
                                "SyncSession initiator exceeded timeout — dropping; periodic-sync will retry"
                            ),
                        }

                        if let Some(tx) = result_tx {
                            let session_result = SyncSessionResult {
                                context_id,
                                peer_id: chosen_peer,
                                took,
                                result,
                            };
                            // Unbounded: the only error is "receiver
                            // gone" (SyncManager loop shut down), and
                            // we don't need to retry in that case.
                            let _ignored = tx.send(session_result);
                        }
                    },
                ));
            }
        }
    }
}

/// Boot the [`SyncSessionActor`] on the supplied dedicated Arbiter
/// and return a [`SyncSessionSender`] for dispatch sites to hold.
///
/// `capacity` bounds the mailbox (queued jobs); `max_concurrent`
/// bounds the in-flight semaphore (running sessions) — together they
/// recreate the legacy `FuturesUnordered` queue + `max_concurrent`
/// cap that `SyncManager::start` enforced before #2316.
///
/// `result_tx` is the channel `SyncManager::start` reads from to
/// update its per-context tracking state. Pass `None` to discard
/// results (used in unit tests).
pub fn start_sync_session_actor(
    arbiter: &ArbiterHandle,
    capacity: usize,
    max_concurrent: usize,
    sync_manager: SyncManager,
    session_timeout: Duration,
    result_tx: Option<mpsc::UnboundedSender<SyncSessionResult>>,
) -> SyncSessionSender {
    let dropped_total = Arc::new(AtomicU64::new(0));
    let dropped_for_actor = Arc::clone(&dropped_total);

    let addr = SyncSessionActor::start_in_arbiter(arbiter, move |ctx| {
        ctx.set_mailbox_capacity(capacity);
        SyncSessionActor::new(
            sync_manager,
            session_timeout,
            max_concurrent,
            result_tx,
            dropped_for_actor,
        )
    });

    SyncSessionSender {
        addr,
        dropped_total,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// Sender wrapper compiles, clones, and exposes a working
    /// `dropped_total` handle when started on a fresh Actix Arbiter.
    /// (Functional coverage is in the kv-store-with-handlers fuzzy
    /// test under issue #2316 acceptance criteria.)
    #[test]
    fn dropped_total_starts_at_zero() {
        let dropped_total = Arc::new(AtomicU64::new(0));
        assert_eq!(dropped_total.load(Ordering::Relaxed), 0);
    }
}