Skip to main content

freenet/client_events/
mod.rs

1//! Clients events related logic and type definitions. For example, receival of client events from applications throught the HTTP gateway.
2
3use either::Either;
4use freenet_stdlib::{
5    client_api::{
6        ClientError, ClientRequest, ContractRequest, ContractResponse, ErrorKind, HostResponse,
7        QueryResponse,
8    },
9    prelude::*,
10};
11use futures::stream::FuturesUnordered;
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use std::cell::Cell;
14use std::fmt::Display;
15use std::sync::Arc;
16use std::{convert::Infallible, fmt::Debug};
17use tracing::Instrument;
18
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc::{self, UnboundedSender};
21
22use crate::contract::{ClientResponsesReceiver, ContractHandlerEvent};
23use crate::message::{NodeEvent, QueryResult};
24use crate::node::OpManager;
25use crate::operations::{get, put, update, OpError, VisitedPeers};
26use crate::ring::KnownPeerKeyLocation;
27use crate::tracing::NetEventLog;
28use crate::{
29    config::{GlobalExecutor, GlobalRng},
30    contract::StoreResponse,
31};
32
33// pub(crate) mod admin_endpoints; // TODO: Add axum dependencies
34pub(crate) mod combinator;
35#[cfg(test)]
36mod integration_verification;
37pub(crate) mod result_router;
38pub(crate) mod session_actor;
39#[cfg(test)]
40mod test_correlation;
41#[cfg(feature = "websocket")]
42pub(crate) mod websocket;
43
44pub(crate) type BoxedClient = Box<dyn ClientEventsProxy + Send + 'static>;
45pub type HostResult = Result<HostResponse, ClientError>;
46
47/// Request correlation ID for end-to-end tracing
48#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49#[repr(transparent)]
50pub struct RequestId(u64);
51
52const COUNTER_BLOCK: u64 = 1_000_000;
53
54thread_local! {
55    static REQUEST_ID_COUNTER: Cell<u64> = {
56        let idx = crate::config::GlobalRng::thread_index();
57        Cell::new(1 + idx * COUNTER_BLOCK)
58    };
59}
60
61impl RequestId {
62    pub fn new() -> Self {
63        Self(REQUEST_ID_COUNTER.with(|c| {
64            let v = c.get();
65            c.set(v + 1);
66            v
67        }))
68    }
69
70    /// Reset the request ID counter to initial state for this thread.
71    /// Thread-local, so safe for parallel test execution.
72    pub fn reset_counter() {
73        let idx = crate::config::GlobalRng::thread_index();
74        REQUEST_ID_COUNTER.with(|c| c.set(1 + idx * COUNTER_BLOCK));
75    }
76}
77
78impl Default for RequestId {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl std::fmt::Display for RequestId {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        write!(f, "req-{}", self.0)
87    }
88}
89
90#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
91#[repr(transparent)]
92pub struct ClientId(usize);
93
94impl From<ClientId> for usize {
95    fn from(val: ClientId) -> Self {
96        val.0
97    }
98}
99
100thread_local! {
101    static CLIENT_ID_COUNTER: Cell<usize> = {
102        let idx = crate::config::GlobalRng::thread_index();
103        Cell::new(1 + (idx as usize) * (COUNTER_BLOCK as usize))
104    };
105}
106
107impl ClientId {
108    pub const FIRST: Self = ClientId(0);
109
110    pub fn next() -> Self {
111        ClientId(CLIENT_ID_COUNTER.with(|c| {
112            let v = c.get();
113            c.set(v + 1);
114            v
115        }))
116    }
117
118    /// Reset the client ID counter to initial state for this thread.
119    /// Thread-local, so safe for parallel test execution.
120    pub fn reset_counter() {
121        let idx = crate::config::GlobalRng::thread_index();
122        CLIENT_ID_COUNTER.with(|c| c.set(1 + (idx as usize) * (COUNTER_BLOCK as usize)));
123    }
124}
125
126impl Display for ClientId {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "{}", self.0)
129    }
130}
131
132type HostIncomingMsg = Result<OpenRequest<'static>, ClientError>;
133
134#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
135pub struct AuthToken(#[serde(deserialize_with = "AuthToken::deser_auth_token")] Arc<str>);
136
137impl AuthToken {
138    pub fn as_str(&self) -> &str {
139        &self.0
140    }
141
142    pub fn generate() -> AuthToken {
143        let mut token = [0u8; 32];
144        GlobalRng::fill_bytes(&mut token);
145        let token_str = bs58::encode(token).into_string();
146        AuthToken::from(token_str)
147    }
148}
149
150impl std::ops::Deref for AuthToken {
151    type Target = str;
152
153    fn deref(&self) -> &Self::Target {
154        &self.0
155    }
156}
157
158impl AuthToken {
159    fn deser_auth_token<'de, D>(deser: D) -> Result<Arc<str>, D::Error>
160    where
161        D: serde::Deserializer<'de>,
162    {
163        let value = <String as Deserialize>::deserialize(deser)?;
164        Ok(value.into())
165    }
166}
167
168impl From<String> for AuthToken {
169    fn from(value: String) -> Self {
170        Self(value.into())
171    }
172}
173
174#[non_exhaustive]
175pub struct OpenRequest<'a> {
176    pub client_id: ClientId,
177    pub request_id: RequestId,
178    pub request: Box<ClientRequest<'a>>,
179    pub notification_channel: Option<UnboundedSender<HostResult>>,
180    pub token: Option<AuthToken>,
181    pub attested_contract: Option<ContractInstanceId>,
182}
183
184impl Display for OpenRequest<'_> {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        write!(
187            f,
188            "client request {{ client: {}, request_id: {}, req: {} }}",
189            &self.client_id, &self.request_id, &*self.request
190        )
191    }
192}
193
194impl<'a> OpenRequest<'a> {
195    pub fn into_owned(self) -> OpenRequest<'static> {
196        OpenRequest {
197            request: Box::new(self.request.into_owned()),
198            ..self
199        }
200    }
201
202    pub fn new(id: ClientId, request: Box<ClientRequest<'a>>) -> Self {
203        Self {
204            client_id: id,
205            request_id: RequestId::new(),
206            request,
207            notification_channel: None,
208            token: None,
209            attested_contract: None,
210        }
211    }
212
213    pub fn with_notification(mut self, ch: UnboundedSender<HostResult>) -> Self {
214        self.notification_channel = Some(ch);
215        self
216    }
217
218    pub fn with_token(mut self, token: Option<AuthToken>) -> Self {
219        self.token = token;
220        self
221    }
222
223    pub fn with_attested_contract(mut self, contract: Option<ContractInstanceId>) -> Self {
224        self.attested_contract = contract;
225        self
226    }
227}
228
229pub trait ClientEventsProxy {
230    /// # Cancellation Safety
231    /// This future must be safe to cancel.
232    fn recv(&mut self) -> BoxFuture<'_, HostIncomingMsg>;
233
234    /// Sends a response from the host to the client application.
235    fn send(
236        &mut self,
237        id: ClientId,
238        response: Result<HostResponse, ClientError>,
239    ) -> BoxFuture<'_, Result<(), ClientError>>;
240}
241
242/// Helper function to register a subscription listener for GET/PUT operations with auto-subscribe
243async fn register_subscription_listener(
244    op_manager: &OpManager,
245    instance_id: ContractInstanceId,
246    client_id: ClientId,
247    subscription_listener: UnboundedSender<HostResult>,
248    operation_type: &str,
249) -> Result<(), Error> {
250    tracing::debug!(
251        client_id = %client_id,
252        contract = %instance_id,
253        operation = operation_type,
254        "Registering subscription listener"
255    );
256    let register_listener = op_manager
257        .notify_contract_handler(ContractHandlerEvent::RegisterSubscriberListener {
258            key: instance_id,
259            client_id,
260            summary: None, // No summary for GET/PUT-based subscriptions
261            subscriber_listener: subscription_listener,
262        })
263        .await
264        .inspect_err(|err| {
265            tracing::error!(
266                client_id = %client_id,
267                contract = %instance_id,
268                operation = operation_type,
269                error = %err,
270                "Register subscriber listener failed"
271            );
272        });
273    match register_listener {
274        Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => {
275            tracing::debug!(
276                client_id = %client_id,
277                contract = %instance_id,
278                operation = operation_type,
279                "Subscriber listener registered successfully"
280            );
281            // Register client subscription to prevent upstream unsubscription while this client is active
282            let result = op_manager
283                .ring
284                .add_client_subscription(&instance_id, client_id);
285            // Emit telemetry if this was the first client (seeding started)
286            if result.is_first_client {
287                if let Some(event) = NetEventLog::seeding_started(&op_manager.ring, instance_id) {
288                    op_manager.ring.register_events(Either::Left(event)).await;
289                }
290            }
291            Ok(())
292        }
293        _ => {
294            tracing::error!(
295                client_id = %client_id,
296                contract = %instance_id,
297                operation = operation_type,
298                phase = "registration_failed",
299                "Subscriber listener registration failed"
300            );
301            Err(Error::Op(OpError::UnexpectedOpState))
302        }
303    }
304}
305
306/// Report an operation init failure to the client via the result router.
307async fn report_op_init_error(
308    op_manager: &OpManager,
309    tx: crate::message::Transaction,
310    contract: &impl std::fmt::Display,
311    op_name: &str,
312    err: &OpError,
313    client_id: ClientId,
314    request_id: RequestId,
315) {
316    tracing::error!(
317        client_id = %client_id,
318        request_id = %request_id,
319        tx = %tx,
320        contract = %contract,
321        error = %err,
322        phase = "error",
323        "{op_name} request failed"
324    );
325
326    // Convert ring errors to type-safe ErrorKind variants so that downstream
327    // consumers (e.g. the HTTP handler's SERVICE_UNAVAILABLE page) can match on
328    // them instead of relying on error message string contents.
329    let error_kind = match err {
330        OpError::RingError(crate::ring::RingError::EmptyRing) => ErrorKind::EmptyRing,
331        OpError::RingError(crate::ring::RingError::PeerNotJoined) => ErrorKind::PeerNotJoined,
332        OpError::RingError(crate::ring::RingError::ConnError(_))
333        | OpError::RingError(crate::ring::RingError::NoCachingPeers(_))
334        | OpError::ConnError(_)
335        | OpError::ContractError(_)
336        | OpError::ExecutorError(_)
337        | OpError::UnexpectedOpState
338        | OpError::InvalidStateTransition { .. }
339        | OpError::NotificationError
340        | OpError::NotificationChannelError(_)
341        | OpError::IncorrectTxType(..)
342        | OpError::OpNotPresent(_)
343        | OpError::OpNotAvailable(_)
344        | OpError::StreamCancelled
345        | OpError::OrphanStreamClaimFailed
346        | OpError::StatePushed => ErrorKind::OperationError {
347            cause: format!("{op_name} operation failed: {err}").into(),
348        },
349    };
350
351    let error_response = Err(error_kind.into());
352
353    if let Err(e) = op_manager.result_router_tx.send((tx, error_response)).await {
354        tracing::error!(
355            tx = %tx,
356            error = %e,
357            "Failed to send {op_name} error to result router"
358        );
359    }
360
361    // Clean up request router so subsequent requests for the same resource
362    // create a fresh operation instead of reusing this failed transaction.
363    // Without this, the resource→transaction mapping persists and new clients
364    // get stale cached errors indefinitely (see diagnostic report 8TSMXY).
365    op_manager.completed(tx);
366}
367
368/// Process client events.
369///
370/// # Architecture: Dual-Mode Client Handling
371///
372/// This function operates in one of two modes based on `op_manager.actor_clients`:
373///
374/// - Uses ResultRouter → SessionActor for centralized client communication
375/// - Uses RequestRouter for operation deduplication (multiple clients share one operation)
376/// - More scalable and efficient for concurrent clients
377pub async fn client_event_handling<ClientEv>(
378    op_manager: Arc<OpManager>,
379    mut client_events: ClientEv,
380    mut client_responses: ClientResponsesReceiver,
381    node_controller: tokio::sync::mpsc::Sender<NodeEvent>,
382) -> anyhow::Result<Infallible>
383where
384    ClientEv: ClientEventsProxy + Send + 'static,
385{
386    let request_router = std::sync::Arc::new(crate::node::RequestRouter::new());
387    // Register the router with op_manager so completed operations clean up stale entries.
388    // Without this, subsequent requests for the same resource would hang forever.
389    op_manager.set_request_router(request_router.clone());
390    let request_router = Some(request_router);
391    let mut results = FuturesUnordered::new();
392    loop {
393        // Uses deterministic_select! for DST - guards are evaluated BEFORE futures are created
394        crate::deterministic_select! {
395            client_request = client_events.recv() => {
396                let req = match client_request {
397                    Ok(request) => {
398                        tracing::debug!(
399                            client_id = %request.client_id,
400                            request_id = %request.request_id,
401                            request_type = ?request.request,
402                            "Received client request"
403                        );
404                        request
405                    }
406                    Err(error) if matches!(error.kind(), ErrorKind::Shutdown) => {
407                        node_controller.send(NodeEvent::Disconnect { cause: None }).await.ok();
408                        anyhow::bail!("shutdown event");
409                    }
410                    Err(error) if matches!(error.kind(), ErrorKind::TransportProtocolDisconnect) => {
411                        // A single client disconnecting is not fatal — continue serving
412                        // other clients. The combinator already cleaned up the dead slot.
413                        tracing::debug!(error = %error, "Client transport disconnected");
414                        continue;
415                    }
416                    Err(error) => {
417                        tracing::debug!(error = %error, "Client error");
418                        continue;
419                    }
420                };
421                let cli_id = req.client_id;
422                let res = process_open_request(req, op_manager.clone(), request_router.clone()).await;
423                results.push(async move {
424                    match res.await {
425                        Ok(Some(Either::Left(res))) => (cli_id, Ok(Some(res))),
426                        Ok(Some(Either::Right(mut cb))) => {
427                            match cb.recv().await {
428                                Some(res) => (cli_id, Ok(Some(res))),
429                                None => (cli_id, Err(ClientError::from(ErrorKind::ChannelClosed))),
430                            }
431                        }
432                        Ok(None) => (cli_id, Ok(None)),
433                        Err(Error::Disconnected) => {
434                            tracing::debug!(client_id = %cli_id, "Client disconnected");
435                            (cli_id, Err(ClientError::from(ErrorKind::Disconnect)))
436                        }
437                        Err(Error::PeerNotJoined) => {
438                            tracing::warn!(
439                                client_id = %cli_id,
440                                "Operation rejected: peer has not joined network yet - client should retry after join"
441                            );
442                            (cli_id, Err(ErrorKind::PeerNotJoined.into()))
443                        }
444                        Err(Error::EmptyRing) => {
445                            tracing::warn!(
446                                client_id = %cli_id,
447                                "Operation rejected: no ring connections found - client should retry after connections are established"
448                            );
449                            (cli_id, Err(ErrorKind::EmptyRing.into()))
450                        }
451                        Err(err) => {
452                            tracing::error!(
453                                client_id = %cli_id,
454                                error = %err,
455                                "Operation error"
456                            );
457                            (cli_id, Err(ErrorKind::OperationError { cause: format!("{err}").into() }.into()))
458                        }
459                    }
460                });
461            },
462            res = client_responses.recv() => {
463                if let Some((cli_id, request_id, res)) = res {
464                    if let Ok(result) = &res {
465                        tracing::debug!(
466                            client_id = %cli_id,
467                            request_id = %request_id,
468                            response = %result,
469                            "Sending client response"
470                        );
471                    }
472                    if let Err(err) = client_events.send(cli_id, res).await {
473                        tracing::debug!(
474                            client_id = %cli_id,
475                            error = %err,
476                            "Client channel closed, response dropped"
477                        );
478                    }
479                }
480            },
481            res = results.next(), if !results.is_empty() => {
482                let Some(f_res) = res else {
483                    unreachable!("results.next() should only return None if results is empty, which is guarded against");
484                };
485                match f_res {
486                    (cli_id, Ok(Some(res))) => {
487                        let res = match res {
488                            QueryResult::Connections(conns) => {
489                                // Connected peers must have known addresses - use type-safe conversion
490                                Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers {
491                                    peers: conns.into_iter().filter_map(|p| {
492                                        KnownPeerKeyLocation::try_from(&p).ok().map(|known| {
493                                            (p.pub_key.to_string(), known.socket_addr())
494                                        })
495                                    }).collect() }
496                                ))
497                            }
498                            QueryResult::GetResult { key, state, contract } => {
499                                Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
500                                    key,
501                                    state,
502                                    contract,
503                                }))
504                            }
505                            QueryResult::DelegateResult { response, .. } => {
506                                response
507                            }
508                            QueryResult::NetworkDebug(debug_info) => {
509                                // Convert internal types to stdlib types
510                                let subscriptions = debug_info.application_subscriptions.into_iter().map(|sub| {
511                                    freenet_stdlib::client_api::SubscriptionInfo {
512                                        contract_key: sub.instance_id,
513                                        client_id: sub.client_id.into(),
514                                    }
515                                }).collect();
516
517                                // Connected peers must have known addresses - use type-safe conversion
518                                let connected_peers = debug_info.connected_peers.into_iter().filter_map(|peer| {
519                                    KnownPeerKeyLocation::try_from(&peer).ok().map(|known| {
520                                        (peer.to_string(), known.socket_addr())
521                                    })
522                                }).collect();
523
524                                Ok(HostResponse::QueryResponse(QueryResponse::NetworkDebug(
525                                    freenet_stdlib::client_api::NetworkDebugInfo {
526                                        subscriptions,
527                                        connected_peers,
528                                    }
529                                )))
530                            }
531                            QueryResult::NodeDiagnostics(response) => {
532                                Ok(HostResponse::QueryResponse(QueryResponse::NodeDiagnostics(response)))
533                            }
534                        };
535                        if let Ok(result) = &res {
536                            tracing::debug!(
537                                client_id = %cli_id,
538                                response = %result,
539                                "Sending client operation response"
540                            );
541                        }
542                        if let Err(err) = client_events.send(cli_id, res).await {
543                            tracing::debug!(
544                                client_id = %cli_id,
545                                error = %err,
546                                "Client channel closed, operation response dropped"
547                            );
548                        }
549                    }
550                    (_, Ok(None)) => continue,
551                    (cli_id, Err(err)) => {
552                        tracing::error!(
553                            client_id = %cli_id,
554                            error = %err,
555                            "Sending error response to client"
556                        );
557                        if let Err(send_err) = client_events.send(cli_id, Err(err)).await {
558                            tracing::debug!(
559                                client_id = %cli_id,
560                                error = %send_err,
561                                "Client channel closed, error response dropped"
562                            );
563                        }
564                    }
565                }
566            },
567        }
568    }
569}
570
571#[derive(Debug, thiserror::Error)]
572enum Error {
573    #[error("Node not connected to network")]
574    Disconnected,
575    #[error("Peer has not joined the network yet (no ring location established)")]
576    PeerNotJoined,
577    #[error("No ring connections found")]
578    EmptyRing,
579    #[error("Node error: {0}")]
580    Node(String),
581    #[error(transparent)]
582    Contract(#[from] crate::contract::ContractError),
583    #[error(transparent)]
584    Op(#[from] OpError),
585    #[error(transparent)]
586    Executor(#[from] crate::contract::ExecutorError),
587    #[error(transparent)]
588    Panic(#[from] tokio::task::JoinError),
589}
590
591impl From<crate::ring::RingError> for Error {
592    fn from(err: crate::ring::RingError) -> Self {
593        match err {
594            crate::ring::RingError::PeerNotJoined => Error::PeerNotJoined,
595            crate::ring::RingError::EmptyRing => Error::EmptyRing,
596            other @ crate::ring::RingError::ConnError(_)
597            | other @ crate::ring::RingError::NoCachingPeers(_) => Error::Node(other.to_string()),
598        }
599    }
600}
601
602/// Check that the peer has completed network join before allowing operations.
603/// For gateways: always ready (their address is set from config).
604/// For regular peers: must wait for handshake to complete (peer_ready flag).
605fn ensure_peer_ready(op_manager: &OpManager) -> Result<std::net::SocketAddr, Error> {
606    if !op_manager.is_gateway
607        && !op_manager
608            .peer_ready
609            .load(std::sync::atomic::Ordering::SeqCst)
610    {
611        return Err(Error::PeerNotJoined);
612    }
613    Ok(op_manager.ring.connection_manager.peer_addr()?)
614}
615
616#[inline]
617async fn process_open_request(
618    mut request: OpenRequest<'static>,
619    op_manager: Arc<OpManager>,
620    request_router: Option<Arc<crate::node::RequestRouter>>,
621) -> BoxFuture<'static, Result<Option<Either<QueryResult, mpsc::Receiver<QueryResult>>>, Error>> {
622    let (callback_tx, callback_rx) = if matches!(
623        &*request.request,
624        ClientRequest::NodeQueries(_) | ClientRequest::ContractOp(ContractRequest::Get { .. })
625    ) {
626        let (tx, rx) = mpsc::channel(1);
627        (Some(tx), Some(rx))
628    } else {
629        (None, None)
630    };
631
632    // TODO: wait until we have a peer_id to attempt (should be connected)
633    // this will indirectly start actions on the local contract executor
634    let fut = async move {
635        let client_id = request.client_id;
636        let request_id = request.request_id;
637
638        let subscription_listener: Option<UnboundedSender<HostResult>> =
639            request.notification_channel.take();
640
641        match *request.request {
642            ClientRequest::ContractOp(ops) => {
643                match ops {
644                    ContractRequest::Put {
645                        state,
646                        contract,
647                        related_contracts,
648                        subscribe,
649                        blocking_subscribe,
650                    } => {
651                        let peer_id = ensure_peer_ready(&op_manager)?;
652
653                        tracing::debug!(
654                            client_id = %client_id,
655                            request_id = %request_id,
656                            peer = %peer_id,
657                            phase = "request",
658                            "Received PUT request from client"
659                        );
660
661                        let contract_key = contract.key();
662
663                        // Check if this will be a local-only PUT (no network peers available)
664                        // This prevents race condition where PUT completes instantly and TX is removed
665                        // before a second client can reuse it (issue #1886)
666                        let own_location = op_manager.ring.connection_manager.own_location();
667                        let skip_list: Vec<_> = own_location.socket_addr().into_iter().collect();
668                        let has_remote_peers = op_manager
669                            .ring
670                            .closest_potentially_caching(&contract_key, skip_list.as_slice())
671                            .is_some();
672
673                        if !has_remote_peers {
674                            // Local-only PUT - bypass router to avoid race condition
675                            tracing::debug!(
676                                client_id = %client_id,
677                                request_id = %request_id,
678                                peer = %peer_id,
679                                contract = %contract_key,
680                                phase = "local_only",
681                                "PUT will complete locally (no remote peers)"
682                            );
683
684                            // Start a local PUT operation without going through the router
685                            // This avoids the race condition while still providing proper result delivery
686                            let op = put::start_op(
687                                contract.clone(),
688                                related_contracts.clone(),
689                                state.clone(),
690                                op_manager.ring.max_hops_to_live,
691                                subscribe,
692                                blocking_subscribe,
693                            );
694                            let op_id = op.id;
695
696                            op_manager
697                                .ch_outbound
698                                .waiting_for_transaction_result(op_id, client_id, request_id)
699                                .await
700                                .inspect_err(|err| {
701                                    tracing::error!(
702                                        client_id = %client_id,
703                                        request_id = %request_id,
704                                        tx = %op_id,
705                                        error = %err,
706                                        "Error waiting for transaction result"
707                                    )
708                                })?;
709
710                            if let Err(err) = put::request_put(&op_manager, op).await {
711                                report_op_init_error(
712                                    &op_manager,
713                                    op_id,
714                                    &contract_key,
715                                    "PUT",
716                                    &err,
717                                    client_id,
718                                    request_id,
719                                )
720                                .await;
721                            } else if subscribe {
722                                if let Some(sl) = subscription_listener {
723                                    register_subscription_listener(
724                                        &op_manager,
725                                        *contract_key.id(),
726                                        client_id,
727                                        sl,
728                                        "PUT",
729                                    )
730                                    .await?;
731                                } else {
732                                    tracing::warn!(
733                                        client_id = %client_id,
734                                        contract = %contract_key,
735                                        "PUT with subscribe=true but no subscription_listener"
736                                    );
737                                }
738                            }
739                        } else if let Some(router) = &request_router {
740                            tracing::debug!(
741                                client_id = %client_id,
742                                request_id = %request_id,
743                                peer = %peer_id,
744                                contract = %contract_key,
745                                phase = "routing",
746                                "Routing PUT request through deduplication layer"
747                            );
748
749                            let request = crate::node::DeduplicatedRequest::Put {
750                                key: contract_key,
751                                contract: contract.clone(),
752                                related_contracts: related_contracts.clone(),
753                                state: state.clone(),
754                                subscribe,
755                                blocking_subscribe,
756                                client_id,
757                                request_id,
758                            };
759
760                            let (transaction_id, should_start_operation) =
761                                router.route_request(request).await.map_err(|e| {
762                                    Error::Node(format!("Request routing failed: {}", e))
763                                })?;
764
765                            op_manager
766                                .ch_outbound
767                                .waiting_for_transaction_result(
768                                    transaction_id,
769                                    client_id,
770                                    request_id,
771                                )
772                                .await
773                                .inspect_err(|err| {
774                                    tracing::error!(
775                                        "Error waiting for transaction result: {}",
776                                        err
777                                    );
778                                })?;
779
780                            if should_start_operation {
781                                tracing::debug!(
782                                    client_id = %client_id,
783                                    request_id = %request_id,
784                                    tx = %transaction_id,
785                                    peer = %peer_id,
786                                    contract = %contract_key,
787                                    phase = "new_operation",
788                                    "Starting new PUT network operation"
789                                );
790
791                                let op = put::start_op_with_id(
792                                    contract.clone(),
793                                    related_contracts.clone(),
794                                    state.clone(),
795                                    op_manager.ring.max_hops_to_live,
796                                    subscribe,
797                                    blocking_subscribe,
798                                    transaction_id,
799                                );
800
801                                if let Err(err) = put::request_put(&op_manager, op).await {
802                                    report_op_init_error(
803                                        &op_manager,
804                                        transaction_id,
805                                        &contract_key,
806                                        "PUT",
807                                        &err,
808                                        client_id,
809                                        request_id,
810                                    )
811                                    .await;
812                                } else if subscribe {
813                                    if let Some(sl) = subscription_listener {
814                                        register_subscription_listener(
815                                            &op_manager,
816                                            *contract_key.id(),
817                                            client_id,
818                                            sl,
819                                            "PUT",
820                                        )
821                                        .await?;
822                                    } else {
823                                        tracing::warn!(
824                                            client_id = %client_id,
825                                            contract = %contract_key,
826                                            "PUT with subscribe=true but no subscription_listener"
827                                        );
828                                    }
829                                }
830                            } else {
831                                tracing::debug!(
832                                    client_id = %client_id,
833                                    request_id = %request_id,
834                                    tx = %transaction_id,
835                                    peer = %peer_id,
836                                    contract = %contract_key,
837                                    phase = "reuse",
838                                    "Reusing existing PUT operation - client registered for result"
839                                );
840                                if subscribe {
841                                    if let Some(sl) = subscription_listener {
842                                        register_subscription_listener(
843                                            &op_manager,
844                                            *contract_key.id(),
845                                            client_id,
846                                            sl,
847                                            "PUT",
848                                        )
849                                        .await?;
850                                    } else {
851                                        tracing::warn!(
852                                            client_id = %client_id,
853                                            contract = %contract_key,
854                                            "PUT with subscribe=true but no subscription_listener"
855                                        );
856                                    }
857                                }
858                            }
859                        } else {
860                            tracing::debug!(
861                                client_id = %client_id,
862                                request_id = %request_id,
863                                peer = %peer_id,
864                                contract = %contract_key,
865                                phase = "legacy",
866                                "Starting direct PUT operation (legacy mode)"
867                            );
868
869                            let op = put::start_op(
870                                contract.clone(),
871                                related_contracts.clone(),
872                                state.clone(),
873                                op_manager.ring.max_hops_to_live,
874                                subscribe,
875                                blocking_subscribe,
876                            );
877                            let op_id = op.id;
878
879                            op_manager
880                                .ch_outbound
881                                .waiting_for_transaction_result(op_id, client_id, request_id)
882                                .await
883                                .inspect_err(|err| {
884                                    tracing::error!(
885                                        client_id = %client_id,
886                                        request_id = %request_id,
887                                        tx = %op_id,
888                                        error = %err,
889                                        "Error waiting for transaction result"
890                                    )
891                                })?;
892
893                            if let Err(err) = put::request_put(&op_manager, op).await {
894                                report_op_init_error(
895                                    &op_manager,
896                                    op_id,
897                                    &contract_key,
898                                    "PUT",
899                                    &err,
900                                    client_id,
901                                    request_id,
902                                )
903                                .await;
904                            } else if subscribe {
905                                if let Some(sl) = subscription_listener {
906                                    register_subscription_listener(
907                                        &op_manager,
908                                        *contract_key.id(),
909                                        client_id,
910                                        sl,
911                                        "PUT",
912                                    )
913                                    .await?;
914                                } else {
915                                    tracing::warn!(
916                                        client_id = %client_id,
917                                        contract = %contract_key,
918                                        "PUT with subscribe=true but no subscription_listener"
919                                    );
920                                }
921                            }
922                        }
923                    }
924                    ContractRequest::Update { key, data } => {
925                        let peer_id = ensure_peer_ready(&op_manager)?;
926
927                        tracing::debug!(
928                            client_id = %client_id,
929                            request_id = %request_id,
930                            peer = %peer_id,
931                            contract = %key,
932                            phase = "request",
933                            "Received UPDATE request from client"
934                        );
935
936                        let related_contracts = RelatedContracts::default();
937
938                        tracing::debug!(
939                            client_id = %client_id,
940                            request_id = %request_id,
941                            peer = %peer_id,
942                            contract = %key,
943                            data = ?data,
944                            phase = "starting",
945                            "Starting UPDATE operation - passing delta to network layer"
946                        );
947
948                        // Convert UpdateData to 'static lifetime for storage in operation state.
949                        // This is safe because we're cloning the underlying bytes.
950                        let update_data: UpdateData<'static> = match data {
951                            UpdateData::State(s) => UpdateData::State(State::from(s.into_bytes())),
952                            UpdateData::Delta(d) => {
953                                UpdateData::Delta(StateDelta::from(d.into_bytes()))
954                            }
955                            UpdateData::StateAndDelta { state, delta } => {
956                                UpdateData::StateAndDelta {
957                                    state: State::from(state.into_bytes()),
958                                    delta: StateDelta::from(delta.into_bytes()),
959                                }
960                            }
961                            UpdateData::RelatedState { related_to, state } => {
962                                UpdateData::RelatedState {
963                                    related_to,
964                                    state: State::from(state.into_bytes()),
965                                }
966                            }
967                            UpdateData::RelatedDelta { related_to, delta } => {
968                                UpdateData::RelatedDelta {
969                                    related_to,
970                                    delta: StateDelta::from(delta.into_bytes()),
971                                }
972                            }
973                            UpdateData::RelatedStateAndDelta {
974                                related_to,
975                                state,
976                                delta,
977                            } => UpdateData::RelatedStateAndDelta {
978                                related_to,
979                                state: State::from(state.into_bytes()),
980                                delta: StateDelta::from(delta.into_bytes()),
981                            },
982                        };
983
984                        tracing::debug!(
985                            client_id = %client_id,
986                            request_id = %request_id,
987                            peer = %peer_id,
988                            contract = %key,
989                            phase = "sending",
990                            "Sending UPDATE operation to network layer"
991                        );
992
993                        if let Some(router) = &request_router {
994                            tracing::debug!(
995                                client_id = %client_id,
996                                request_id = %request_id,
997                                peer = %peer_id,
998                                contract = %key,
999                                phase = "routing",
1000                                "Routing UPDATE request through deduplication layer"
1001                            );
1002
1003                            let request = crate::node::DeduplicatedRequest::Update {
1004                                key,
1005                                update_data: update_data.clone(),
1006                                related_contracts: related_contracts.clone(),
1007                                client_id,
1008                                request_id,
1009                            };
1010
1011                            let (transaction_id, should_start_operation) =
1012                                router.route_request(request).await.map_err(|e| {
1013                                    Error::Node(format!("Request routing failed: {}", e))
1014                                })?;
1015
1016                            // Always register this client for the result
1017                            op_manager
1018                                .ch_outbound
1019                                .waiting_for_transaction_result(
1020                                    transaction_id,
1021                                    client_id,
1022                                    request_id,
1023                                )
1024                                .await
1025                                .inspect_err(|err| {
1026                                    tracing::error!(
1027                                        "Error waiting for transaction result: {}",
1028                                        err
1029                                    );
1030                                })?;
1031
1032                            // Only start new network operation if this is a new operation
1033                            if should_start_operation {
1034                                tracing::debug!(
1035                                    client_id = %client_id,
1036                                    request_id = %request_id,
1037                                    tx = %transaction_id,
1038                                    peer = %peer_id,
1039                                    contract = %key,
1040                                    phase = "new_operation",
1041                                    "Starting new UPDATE network operation"
1042                                );
1043
1044                                let op = update::start_op_with_id(
1045                                    key,
1046                                    update_data.clone(),
1047                                    related_contracts,
1048                                    transaction_id,
1049                                );
1050
1051                                tracing::debug!(
1052                                    request_id = %request_id,
1053                                    transaction_id = %op.id,
1054                                    operation = "update",
1055                                    "Request-Transaction correlation"
1056                                );
1057
1058                                match update::request_update(&op_manager, op).await {
1059                                    Ok(()) | Err(OpError::StatePushed) => {}
1060                                    Err(err) => {
1061                                        report_op_init_error(
1062                                            &op_manager,
1063                                            transaction_id,
1064                                            &key,
1065                                            "UPDATE",
1066                                            &err,
1067                                            client_id,
1068                                            request_id,
1069                                        )
1070                                        .await;
1071                                    }
1072                                }
1073                            } else {
1074                                tracing::debug!(
1075                                    client_id = %client_id,
1076                                    request_id = %request_id,
1077                                    tx = %transaction_id,
1078                                    peer = %peer_id,
1079                                    contract = %key,
1080                                    phase = "reuse",
1081                                    "Reusing existing UPDATE operation - client registered for result"
1082                                );
1083                            }
1084                        } else {
1085                            tracing::debug!(
1086                                client_id = %client_id,
1087                                request_id = %request_id,
1088                                peer = %peer_id,
1089                                contract = %key,
1090                                phase = "legacy",
1091                                "Starting direct UPDATE operation (legacy mode)"
1092                            );
1093
1094                            // Legacy mode: direct operation without deduplication
1095                            let op = update::start_op(key, update_data, related_contracts);
1096                            let op_id = op.id;
1097
1098                            tracing::debug!(
1099                                request_id = %request_id,
1100                                transaction_id = %op_id,
1101                                operation = "update",
1102                                "Request-Transaction correlation"
1103                            );
1104
1105                            op_manager
1106                                .ch_outbound
1107                                .waiting_for_transaction_result(op_id, client_id, request_id)
1108                                .await
1109                                .inspect_err(|err| {
1110                                    tracing::error!(
1111                                        "Error waiting for transaction result: {}",
1112                                        err
1113                                    );
1114                                })?;
1115
1116                            if let Err(err) = update::request_update(&op_manager, op).await {
1117                                report_op_init_error(
1118                                    &op_manager,
1119                                    op_id,
1120                                    &key,
1121                                    "UPDATE",
1122                                    &err,
1123                                    client_id,
1124                                    request_id,
1125                                )
1126                                .await;
1127                            }
1128                        }
1129                    }
1130                    ContractRequest::Get {
1131                        key,
1132                        return_contract_code,
1133                        subscribe,
1134                        blocking_subscribe,
1135                    } => {
1136                        let peer_id = ensure_peer_ready(&op_manager)?;
1137
1138                        // Query local store first. We use the result in two cases:
1139                        // 1. Error handling: if local storage has issues, fail fast
1140                        // 2. No connections: if isolated (no peers), return local cache immediately
1141                        //
1142                        // For connected nodes, we use smart cache routing: return local cache
1143                        // if subscribed (cache is fresh), otherwise fetch from network.
1144                        // See PR #2388 for why always-local-first was problematic.
1145                        let (full_key, state, contract) = match op_manager
1146                            .notify_contract_handler(ContractHandlerEvent::GetQuery {
1147                                instance_id: key,
1148                                return_contract_code,
1149                            })
1150                            .await
1151                        {
1152                            Ok(ContractHandlerEvent::GetResponse {
1153                                key: Some(full_key),
1154                                response: Ok(StoreResponse { state, contract }),
1155                            }) => (Some(full_key), state, contract),
1156                            Ok(ContractHandlerEvent::GetResponse {
1157                                key: None,
1158                                response:
1159                                    Ok(StoreResponse {
1160                                        state: None,
1161                                        contract: None,
1162                                    }),
1163                            }) => (None, None, None), // Contract not found locally
1164                            Ok(ContractHandlerEvent::GetResponse {
1165                                response: Err(err), ..
1166                            }) => {
1167                                tracing::error!(
1168                                    client_id = %client_id,
1169                                    request_id = %request_id,
1170                                    %key,
1171                                    error = %err,
1172                                    phase = "error",
1173                                    "GET query failed (executor error)"
1174                                );
1175                                return Err(Error::Executor(err));
1176                            }
1177                            Err(err) => {
1178                                tracing::error!(
1179                                    client_id = %client_id,
1180                                    request_id = %request_id,
1181                                    %key,
1182                                    error = %err,
1183                                    phase = "error",
1184                                    "GET query failed (contract error)"
1185                                );
1186                                return Err(Error::Contract(err));
1187                            }
1188                            Ok(_) => {
1189                                tracing::error!(
1190                                    client_id = %client_id,
1191                                    request_id = %request_id,
1192                                    %key,
1193                                    phase = "error",
1194                                    "GET query failed (unexpected state)"
1195                                );
1196                                return Err(Error::Op(OpError::UnexpectedOpState));
1197                            }
1198                        };
1199
1200                        // Determine whether to route through network or return local cache.
1201                        //
1202                        // The key insight: if we're actively subscribed to a contract (is_seeding_contract),
1203                        // our local cache is kept fresh via subscription updates. So we can return it.
1204                        // If we're NOT subscribed, our cache may be stale and we should fetch from network.
1205                        //
1206                        // This fixes the stale cache bug (PR #2388) while avoiding the performance
1207                        // regression of routing ALL GETs through network.
1208                        let connection_count = op_manager.ring.open_connections();
1209                        let has_local_state = full_key.is_some() && state.is_some();
1210                        let local_satisfies_request =
1211                            has_local_state && (!return_contract_code || contract.is_some());
1212
1213                        // Check if we should seed this contract (have an active subscription,
1214                        // client subscriptions, or it's in our seeding cache).
1215                        // If so, our cache is kept fresh via proximity cache updates.
1216                        let is_subscribed = full_key
1217                            .as_ref()
1218                            .map(|k| op_manager.ring.should_seed(k))
1219                            .unwrap_or(false);
1220
1221                        // Return local cache if we have valid state AND EITHER:
1222                        // 1. No connections (isolated node - can only use local cache), OR
1223                        // 2. Actively subscribed (cache is fresh via subscription updates)
1224                        if local_satisfies_request && (connection_count == 0 || is_subscribed) {
1225                            let full_key = full_key.unwrap();
1226                            let state = state.unwrap();
1227
1228                            tracing::debug!(
1229                                client_id = %client_id,
1230                                request_id = %request_id,
1231                                peer = %peer_id,
1232                                contract = %full_key,
1233                                is_subscribed,
1234                                connection_count,
1235                                phase = "local_cache",
1236                                "Returning locally cached contract state (subscribed or isolated)"
1237                            );
1238
1239                            // Handle subscription for locally found contracts
1240                            if subscribe {
1241                                if let Some(subscription_listener) = subscription_listener {
1242                                    register_subscription_listener(
1243                                        &op_manager,
1244                                        *full_key.id(),
1245                                        client_id,
1246                                        subscription_listener,
1247                                        "local GET",
1248                                    )
1249                                    .await?;
1250                                } else {
1251                                    tracing::warn!(
1252                                        client_id = %client_id,
1253                                        contract = %full_key,
1254                                        "GET with subscribe=true but no subscription_listener"
1255                                    );
1256                                }
1257                            }
1258
1259                            return Ok(Some(Either::Left(QueryResult::GetResult {
1260                                key: full_key,
1261                                state,
1262                                contract,
1263                            })));
1264                        }
1265
1266                        // Route through network when:
1267                        // 1. We don't have local cache, OR
1268                        // 2. We have local cache but are NOT subscribed (cache may be stale)
1269                        if let Some(router) = &request_router {
1270                            tracing::debug!(
1271                                client_id = %client_id,
1272                                request_id = %request_id,
1273                                peer = %peer_id,
1274                                contract = %key,
1275                                has_local = has_local_state,
1276                                is_subscribed,
1277                                connection_count,
1278                                phase = "network_routing",
1279                                "Routing GET request through network (not subscribed or no local cache)"
1280                            );
1281
1282                            let request = crate::node::DeduplicatedRequest::Get {
1283                                key,
1284                                return_contract_code,
1285                                subscribe,
1286                                blocking_subscribe,
1287                                client_id,
1288                                request_id,
1289                            };
1290
1291                            let (transaction_id, should_start_operation) =
1292                                router.route_request(request).await.map_err(|e| {
1293                                    Error::Node(format!("Request routing failed: {}", e))
1294                                })?;
1295
1296                            // Always register this client for the result
1297                            op_manager
1298                                .ch_outbound
1299                                .waiting_for_transaction_result(
1300                                    transaction_id,
1301                                    client_id,
1302                                    request_id,
1303                                )
1304                                .await
1305                                .inspect_err(|err| {
1306                                    tracing::error!(
1307                                        "Error waiting for transaction result (get): {}",
1308                                        err
1309                                    );
1310                                })?;
1311
1312                            // Only start new network operation if this is a new operation
1313                            if should_start_operation {
1314                                tracing::debug!(
1315                                    client_id = %client_id,
1316                                    request_id = %request_id,
1317                                    tx = %transaction_id,
1318                                    peer = %peer_id,
1319                                    contract = %key,
1320                                    phase = "new_operation",
1321                                    "Starting new GET network operation"
1322                                );
1323
1324                                let op = get::start_op_with_id(
1325                                    key,
1326                                    return_contract_code,
1327                                    subscribe,
1328                                    blocking_subscribe,
1329                                    transaction_id,
1330                                );
1331
1332                                if let Err(err) = get::request_get(
1333                                    &op_manager,
1334                                    op,
1335                                    VisitedPeers::new(&transaction_id),
1336                                )
1337                                .await
1338                                {
1339                                    report_op_init_error(
1340                                        &op_manager,
1341                                        transaction_id,
1342                                        &key,
1343                                        "GET",
1344                                        &err,
1345                                        client_id,
1346                                        request_id,
1347                                    )
1348                                    .await;
1349                                } else if subscribe {
1350                                    if let Some(sl) = subscription_listener {
1351                                        register_subscription_listener(
1352                                            &op_manager,
1353                                            key,
1354                                            client_id,
1355                                            sl,
1356                                            "GET",
1357                                        )
1358                                        .await?;
1359                                    } else {
1360                                        tracing::warn!(
1361                                            client_id = %client_id,
1362                                            contract = %key,
1363                                            "GET with subscribe=true but no subscription_listener"
1364                                        );
1365                                    }
1366                                }
1367                            } else {
1368                                tracing::debug!(
1369                                    client_id = %client_id,
1370                                    request_id = %request_id,
1371                                    tx = %transaction_id,
1372                                    peer = %peer_id,
1373                                    contract = %key,
1374                                    phase = "reuse",
1375                                    "Reusing existing GET operation - client registered for result"
1376                                );
1377
1378                                // Register subscription listener for reused operations too
1379                                if subscribe {
1380                                    if let Some(subscription_listener) = subscription_listener {
1381                                        register_subscription_listener(
1382                                            &op_manager,
1383                                            key,
1384                                            client_id,
1385                                            subscription_listener,
1386                                            "network GET",
1387                                        )
1388                                        .await?;
1389                                    } else {
1390                                        tracing::warn!(
1391                                            client_id = %client_id,
1392                                            contract = %key,
1393                                            "GET with subscribe=true but no subscription_listener"
1394                                        );
1395                                    }
1396                                }
1397                            }
1398                        } else {
1399                            tracing::debug!(
1400                                client_id = %client_id,
1401                                request_id = %request_id,
1402                                peer = %peer_id,
1403                                contract = %key,
1404                                phase = "legacy",
1405                                "Contract not found locally, starting direct GET operation (legacy mode)"
1406                            );
1407
1408                            // Legacy mode: direct operation without deduplication
1409                            let op = get::start_op(
1410                                key,
1411                                return_contract_code,
1412                                subscribe,
1413                                blocking_subscribe,
1414                            );
1415                            let op_id = op.id;
1416
1417                            op_manager
1418                                .ch_outbound
1419                                .waiting_for_transaction_result(op_id, client_id, request_id)
1420                                .await
1421                                .inspect_err(|err| {
1422                                    tracing::error!(
1423                                        client_id = %client_id,
1424                                        request_id = %request_id,
1425                                        tx = %op_id,
1426                                        error = %err,
1427                                        "Error waiting for transaction result"
1428                                    )
1429                                })?;
1430
1431                            if let Err(err) =
1432                                get::request_get(&op_manager, op, VisitedPeers::new(&op_id)).await
1433                            {
1434                                report_op_init_error(
1435                                    &op_manager,
1436                                    op_id,
1437                                    &key,
1438                                    "GET",
1439                                    &err,
1440                                    client_id,
1441                                    request_id,
1442                                )
1443                                .await;
1444                            }
1445
1446                            if subscribe {
1447                                if let Some(subscription_listener) = subscription_listener {
1448                                    register_subscription_listener(
1449                                        &op_manager,
1450                                        key,
1451                                        client_id,
1452                                        subscription_listener,
1453                                        "GET",
1454                                    )
1455                                    .await?;
1456                                } else {
1457                                    tracing::warn!(
1458                                        client_id = %client_id,
1459                                        contract = %key,
1460                                        "GET with subscribe=true but no subscription_listener"
1461                                    );
1462                                }
1463                            }
1464                        }
1465                    }
1466                    ContractRequest::Subscribe { key, summary } => {
1467                        let peer_id = ensure_peer_ready(&op_manager)?;
1468
1469                        tracing::debug!(
1470                            client_id = %client_id,
1471                            request_id = %request_id,
1472                            peer = %peer_id,
1473                            contract = %key,
1474                            phase = "request",
1475                            "Received SUBSCRIBE request from client"
1476                        );
1477
1478                        let Some(subscriber_listener) = subscription_listener else {
1479                            tracing::error!(
1480                                client_id = %client_id,
1481                                request_id = %request_id,
1482                                contract = %key,
1483                                "No subscriber listener for SUBSCRIBE request"
1484                            );
1485                            return Ok(None);
1486                        };
1487
1488                        let register_listener = op_manager
1489                            .notify_contract_handler(
1490                                ContractHandlerEvent::RegisterSubscriberListener {
1491                                    key,
1492                                    client_id,
1493                                    summary,
1494                                    subscriber_listener,
1495                                },
1496                            )
1497                            .await
1498                            .inspect_err(|err| {
1499                                tracing::error!(
1500                                    client_id = %client_id,
1501                                    request_id = %request_id,
1502                                    contract = %key,
1503                                    error = %err,
1504                                    "Register subscriber listener error"
1505                                );
1506                            });
1507                        match register_listener {
1508                            Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => {
1509                                tracing::debug!(
1510                                    client_id = %client_id,
1511                                    request_id = %request_id,
1512                                    contract = %key,
1513                                    phase = "listener_registered",
1514                                    "Subscriber listener registered successfully"
1515                                );
1516                                // Register client subscription to enable subscription tree pruning on disconnect
1517                                let result =
1518                                    op_manager.ring.add_client_subscription(&key, client_id);
1519                                // Emit telemetry if this was the first client (seeding started)
1520                                if result.is_first_client {
1521                                    if let Some(event) =
1522                                        NetEventLog::seeding_started(&op_manager.ring, key)
1523                                    {
1524                                        op_manager.ring.register_events(Either::Left(event)).await;
1525                                    }
1526                                }
1527                            }
1528                            _ => {
1529                                tracing::error!(
1530                                    client_id = %client_id,
1531                                    request_id = %request_id,
1532                                    contract = %key,
1533                                    phase = "registration_failed",
1534                                    "Subscriber listener registration failed"
1535                                );
1536                                return Err(Error::Op(OpError::UnexpectedOpState));
1537                            }
1538                        }
1539
1540                        // Now start the network subscription operation
1541                        // SUBSCRIBE: Skip router deduplication due to instant-completion race conditions
1542                        // When contracts are local, Subscribe completes instantly which breaks deduplication:
1543                        // - Client 1 subscribes → operation completes → result delivered → TX removed
1544                        // - Client 2 subscribes → tries to reuse TX → but TX already gone
1545                        // Solution: Each client gets their own Subscribe operation (they're lightweight)
1546                        if let Some(_router) = &request_router {
1547                            tracing::debug!(
1548                                client_id = %client_id,
1549                                request_id = %request_id,
1550                                peer = %peer_id,
1551                                contract = %key,
1552                                phase = "no_dedup",
1553                                "Processing SUBSCRIBE without deduplication (instant-completion race avoidance)"
1554                            );
1555
1556                            // Create operation with new transaction ID
1557                            let tx = crate::message::Transaction::new::<
1558                                crate::operations::subscribe::SubscribeMsg,
1559                            >();
1560
1561                            // CRITICAL: Register BEFORE starting operation to avoid race with instant-completion
1562                            use crate::contract::WaitingTransaction;
1563                            op_manager
1564                                .ch_outbound
1565                                .waiting_for_transaction_result(
1566                                    WaitingTransaction::Transaction(tx),
1567                                    client_id,
1568                                    request_id,
1569                                )
1570                                .await
1571                                .inspect_err(|err| {
1572                                    tracing::error!(
1573                                        "Error waiting for transaction result: {}",
1574                                        err
1575                                    );
1576                                })?;
1577
1578                            // Start dedicated operation for this client AFTER registration
1579                            // is_renewal: false - client-initiated subscriptions are new
1580                            let _result_tx = crate::node::subscribe_with_id(
1581                                op_manager.clone(),
1582                                key,
1583                                None, // No legacy registration
1584                                Some(tx),
1585                                false,
1586                            )
1587                            .await
1588                            .inspect_err(|err| {
1589                                tracing::error!(
1590                                    client_id = %client_id,
1591                                    request_id = %request_id,
1592                                    tx = %tx,
1593                                    contract = %key,
1594                                    error = %err,
1595                                    phase = "error",
1596                                    "SUBSCRIBE operation failed"
1597                                );
1598                            })?;
1599
1600                            tracing::debug!(
1601                                request_id = %request_id,
1602                                transaction_id = %tx,
1603                                operation = "subscribe",
1604                                "SUBSCRIBE operation started with dedicated transaction for this client"
1605                            );
1606                        } else {
1607                            tracing::debug!(
1608                                peer_id = %peer_id,
1609                                key = %key,
1610                                "Starting direct SUBSCRIBE operation",
1611                            );
1612
1613                            // Generate transaction, register first, then run op
1614                            let tx = crate::message::Transaction::new::<
1615                                crate::operations::subscribe::SubscribeMsg,
1616                            >();
1617
1618                            op_manager
1619                                .ch_outbound
1620                                .waiting_for_transaction_result(tx, client_id, request_id)
1621                                .await
1622                                .inspect_err(|err| {
1623                                    tracing::error!(
1624                                        "Error waiting for transaction result: {}",
1625                                        err
1626                                    );
1627                                })?;
1628
1629                            // is_renewal: false - client-initiated subscriptions are new
1630                            crate::node::subscribe_with_id(
1631                                op_manager.clone(),
1632                                key,
1633                                None,
1634                                Some(tx),
1635                                false,
1636                            )
1637                            .await
1638                            .inspect_err(|err| {
1639                                tracing::error!(
1640                                    client_id = %client_id,
1641                                    request_id = %request_id,
1642                                    tx = %tx,
1643                                    contract = %key,
1644                                    error = %err,
1645                                    phase = "error",
1646                                    "SUBSCRIBE operation failed"
1647                                );
1648                            })?;
1649
1650                            tracing::debug!(
1651                                request_id = %request_id,
1652                                transaction_id = %tx,
1653                                operation = "subscribe",
1654                                "Request-Transaction correlation"
1655                            );
1656                        }
1657                    }
1658                    _ => {
1659                        tracing::error!(
1660                            client_id = %client_id,
1661                            request_id = %request_id,
1662                            "Unsupported contract operation"
1663                        );
1664                    }
1665                }
1666            }
1667            ClientRequest::DelegateOp(req) => {
1668                tracing::debug!(
1669                    client_id = %client_id,
1670                    request_id = %request_id,
1671                    phase = "request",
1672                    "Received delegate operation from client"
1673                );
1674                let delegate_key = req.key().clone();
1675                let attested_contract = request.attested_contract;
1676
1677                let res = match op_manager
1678                    .notify_contract_handler(ContractHandlerEvent::DelegateRequest {
1679                        req,
1680                        attested_contract,
1681                    })
1682                    .await
1683                {
1684                    Ok(ContractHandlerEvent::DelegateResponse(res)) => res,
1685                    Err(err) => {
1686                        tracing::error!(
1687                            client_id = %client_id,
1688                            request_id = %request_id,
1689                            delegate = %delegate_key,
1690                            error = %err,
1691                            phase = "error",
1692                            "Delegate operation failed (contract error)"
1693                        );
1694                        return Err(Error::Contract(err));
1695                    }
1696                    Ok(_) => {
1697                        tracing::error!(
1698                            client_id = %client_id,
1699                            request_id = %request_id,
1700                            delegate = %delegate_key,
1701                            phase = "error",
1702                            "Delegate operation failed (unexpected state)"
1703                        );
1704                        return Err(Error::Op(OpError::UnexpectedOpState));
1705                    }
1706                };
1707
1708                let host_response = Ok(HostResponse::DelegateResponse {
1709                    key: delegate_key.clone(),
1710                    values: res,
1711                });
1712
1713                if let Some(ch) = &subscription_listener {
1714                    if ch.send(host_response).is_err() {
1715                        tracing::error!(
1716                            client_id = %client_id,
1717                            request_id = %request_id,
1718                            delegate = %delegate_key,
1719                            "Failed to send delegate response through subscription channel"
1720                        );
1721                    }
1722                    return Ok(None);
1723                }
1724
1725                // Return the response to be sent by client_event_handling
1726                return Ok(Some(Either::Left(QueryResult::DelegateResult {
1727                    key: delegate_key,
1728                    response: host_response,
1729                })));
1730            }
1731            ClientRequest::Disconnect { .. } => {
1732                tracing::debug!(
1733                    client_id = %client_id,
1734                    request_id = %request_id,
1735                    "Received disconnect request from client, triggering subscription cleanup"
1736                );
1737
1738                let result = op_manager
1739                    .ring
1740                    .remove_client_from_all_subscriptions(client_id);
1741                for contract in &result.affected_contracts {
1742                    op_manager.interest_manager.remove_local_client(contract);
1743                }
1744                if !result.affected_contracts.is_empty() {
1745                    tracing::debug!(
1746                        %client_id,
1747                        subscriptions_cleaned = result.affected_contracts.len(),
1748                        "Cleaned up client subscriptions and interest tracking"
1749                    );
1750                }
1751
1752                // Send Unsubscribe upstream for contracts with no remaining interest
1753                for contract in &result.affected_contracts {
1754                    if op_manager.ring.should_unsubscribe_upstream(contract) {
1755                        let op_mgr = op_manager.clone();
1756                        let contract = *contract;
1757                        GlobalExecutor::spawn(async move {
1758                            op_mgr.send_unsubscribe_upstream(&contract).await;
1759                        });
1760                    }
1761                }
1762
1763                // Notify contract handler to clean up shared_summaries and
1764                // shared_notifications for this client (fire-and-forget — no response needed).
1765                if let Err(err) = op_manager.ch_outbound.send_to_handler_fire_and_forget(
1766                    ContractHandlerEvent::ClientDisconnect { client_id },
1767                ) {
1768                    tracing::warn!(
1769                        %client_id,
1770                        error = %err,
1771                        "Failed to notify contract handler of client disconnect"
1772                    );
1773                }
1774            }
1775            ClientRequest::NodeQueries(query) => {
1776                tracing::debug!(
1777                    client_id = %client_id,
1778                    request_id = %request_id,
1779                    query = ?query,
1780                    "Received node query from client"
1781                );
1782
1783                let Some(tx) = callback_tx else {
1784                    tracing::error!(
1785                        client_id = %client_id,
1786                        request_id = %request_id,
1787                        "callback_tx not available for NodeQueries"
1788                    );
1789                    unreachable!("callback_tx should always be Some for NodeQueries based on initialization logic");
1790                };
1791
1792                let node_event = match query {
1793                    freenet_stdlib::client_api::NodeQuery::ConnectedPeers => {
1794                        NodeEvent::QueryConnections { callback: tx }
1795                    }
1796                    freenet_stdlib::client_api::NodeQuery::SubscriptionInfo => {
1797                        NodeEvent::QuerySubscriptions { callback: tx }
1798                    }
1799                    freenet_stdlib::client_api::NodeQuery::NodeDiagnostics { config } => {
1800                        NodeEvent::QueryNodeDiagnostics {
1801                            config,
1802                            callback: tx,
1803                        }
1804                    }
1805                    freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo => {
1806                        // TODO: Implement proximity cache info query
1807                        tracing::warn!(
1808                            client_id = %client_id,
1809                            request_id = %request_id,
1810                            "ProximityCacheInfo query not yet implemented"
1811                        );
1812                        return Ok(None);
1813                    }
1814                };
1815
1816                if let Err(err) = op_manager.notify_node_event(node_event).await {
1817                    tracing::error!(
1818                        client_id = %client_id,
1819                        request_id = %request_id,
1820                        error = %err,
1821                        "notify_node_event error"
1822                    );
1823                    return Err(Error::from(err));
1824                }
1825
1826                return Ok(Some(Either::Right(callback_rx.unwrap())));
1827            }
1828            ClientRequest::Close => {
1829                return Err(Error::Disconnected);
1830            }
1831            ClientRequest::Authenticate { .. } | _ => {
1832                tracing::error!(
1833                    client_id = %client_id,
1834                    request_id = %request_id,
1835                    "Unsupported operation"
1836                );
1837            }
1838        }
1839        Ok(None)
1840    };
1841
1842    GlobalExecutor::spawn(fut.instrument(tracing::info_span!(
1843        parent: tracing::Span::current(),
1844        "process_client_request"
1845    )))
1846    .map(|res| match res {
1847        Ok(Ok(res)) => Ok(res),
1848        Ok(Err(err)) => Err(err),
1849        Err(err) => {
1850            tracing::error!(
1851                error = %err,
1852                "Error processing client request (task panic)"
1853            );
1854            Err(Error::from(err))
1855        }
1856    })
1857    .boxed()
1858}
1859
1860pub(crate) mod test {
1861    use std::{
1862        collections::{HashMap, HashSet},
1863        time::Duration,
1864    };
1865
1866    use freenet_stdlib::{
1867        client_api::{ContractRequest, ErrorKind},
1868        prelude::*,
1869    };
1870    use futures::{FutureExt, StreamExt};
1871    use rand::SeedableRng;
1872    use tokio::net::TcpStream;
1873    use tokio::sync::watch::Receiver;
1874    use tokio::sync::Mutex;
1875    use tokio_tungstenite::tungstenite::Message;
1876    use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
1877
1878    use crate::{node::testing_impl::EventId, transport::TransportPublicKey};
1879
1880    use super::*;
1881
1882    pub struct MemoryEventsGen<R = rand::rngs::SmallRng> {
1883        key: TransportPublicKey,
1884        signal: Receiver<(EventId, TransportPublicKey)>,
1885        events_to_gen: HashMap<EventId, ClientRequest<'static>>,
1886        rng: Option<R>,
1887        internal_state: Option<InternalGeneratorState>,
1888        /// Keeps subscription notification receivers alive so the sender half
1889        /// (passed as `notification_channel`) isn't immediately broken.
1890        #[allow(dead_code)]
1891        subscription_receivers: Vec<tokio::sync::mpsc::UnboundedReceiver<HostResult>>,
1892    }
1893
1894    impl<R> MemoryEventsGen<R>
1895    where
1896        R: RandomEventGenerator,
1897    {
1898        pub fn new_with_seed(
1899            signal: Receiver<(EventId, TransportPublicKey)>,
1900            key: TransportPublicKey,
1901            seed: u64,
1902        ) -> Self {
1903            Self {
1904                signal,
1905                key,
1906                events_to_gen: HashMap::new(),
1907                rng: Some(R::seed_from_u64(seed)),
1908                internal_state: None,
1909                subscription_receivers: Vec::new(),
1910            }
1911        }
1912
1913        pub fn rng_params(
1914            &mut self,
1915            id: usize,
1916            num_peers: usize,
1917            max_contract_num: usize,
1918            iterations: usize,
1919        ) {
1920            let internal_state = InternalGeneratorState {
1921                this_peer: id,
1922                num_peers,
1923                max_contract_num,
1924                max_iterations: iterations,
1925                ..Default::default()
1926            };
1927            self.internal_state = Some(internal_state);
1928        }
1929
1930        async fn generate_rand_event(&mut self) -> Option<ClientRequest<'static>> {
1931            let (mut rng, mut state) = self
1932                .rng
1933                .take()
1934                .zip(self.internal_state.take())
1935                .expect("rng should be set");
1936
1937            // Run synchronously - gen_event is fast (just RNG operations) and
1938            // running synchronously ensures deterministic execution under turmoil.
1939            // spawn_blocking would use a real thread pool outside turmoil's control.
1940            let res = rng.gen_event(&mut state);
1941
1942            self.rng = Some(rng);
1943            self.internal_state = Some(state);
1944            res
1945        }
1946    }
1947
1948    impl MemoryEventsGen {
1949        #[cfg(any(test, feature = "testing"))]
1950        pub fn new(
1951            signal: Receiver<(EventId, TransportPublicKey)>,
1952            key: TransportPublicKey,
1953        ) -> Self {
1954            Self {
1955                signal,
1956                key,
1957                events_to_gen: HashMap::new(),
1958                rng: None,
1959                internal_state: None,
1960                subscription_receivers: Vec::new(),
1961            }
1962        }
1963    }
1964
1965    impl<R> MemoryEventsGen<R> {
1966        #[cfg(any(test, feature = "testing"))]
1967        pub fn generate_events(
1968            &mut self,
1969            events: impl IntoIterator<Item = (EventId, ClientRequest<'static>)>,
1970        ) {
1971            self.events_to_gen.extend(events)
1972        }
1973
1974        fn generate_deterministic_event(&mut self, id: &EventId) -> Option<ClientRequest<'static>> {
1975            self.events_to_gen.remove(id)
1976        }
1977
1978        /// Wraps a generated request into an `OpenRequest`, attaching a notification
1979        /// channel when the request is a Subscribe, Put-with-subscribe, or Get-with-subscribe.
1980        fn build_open_request(
1981            &mut self,
1982            request: Box<ClientRequest<'static>>,
1983        ) -> OpenRequest<'static> {
1984            let notification_channel = match request.as_ref() {
1985                ClientRequest::ContractOp(ContractRequest::Subscribe { .. })
1986                | ClientRequest::ContractOp(ContractRequest::Put {
1987                    subscribe: true, ..
1988                })
1989                | ClientRequest::ContractOp(ContractRequest::Get {
1990                    subscribe: true, ..
1991                }) => {
1992                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1993                    self.subscription_receivers.push(rx);
1994                    Some(tx)
1995                }
1996                ClientRequest::DelegateOp(_)
1997                | ClientRequest::ContractOp(_)
1998                | ClientRequest::Disconnect { .. }
1999                | ClientRequest::Authenticate { .. }
2000                | ClientRequest::NodeQueries(_)
2001                | ClientRequest::Close
2002                | _ => None,
2003            };
2004            OpenRequest {
2005                client_id: ClientId::FIRST,
2006                request_id: RequestId::new(),
2007                request,
2008                notification_channel,
2009                token: None,
2010                attested_contract: None,
2011            }
2012            .into_owned()
2013        }
2014    }
2015
2016    impl<R> ClientEventsProxy for MemoryEventsGen<R>
2017    where
2018        R: RandomEventGenerator + Send,
2019    {
2020        fn recv(&mut self) -> BoxFuture<'_, Result<OpenRequest<'static>, ClientError>> {
2021            async {
2022                loop {
2023                    if self.signal.changed().await.is_ok() {
2024                        let (ev_id, pk) = self.signal.borrow().clone();
2025                        if self.rng.is_some() && pk == self.key {
2026                            let request = self
2027                                .generate_rand_event()
2028                                .await
2029                                .ok_or_else(|| ClientError::from(ErrorKind::Disconnect))?;
2030                            return Ok(self.build_open_request(request.into()));
2031                        } else if pk == self.key {
2032                            let request = self
2033                                .generate_deterministic_event(&ev_id)
2034                                .expect("event not found");
2035                            return Ok(self.build_open_request(request.into()));
2036                        }
2037                    } else {
2038                        // probably the process finished, wait for a bit and then kill the thread
2039                        tokio::time::sleep(Duration::from_secs(1)).await;
2040                        break Err(ErrorKind::Shutdown.into());
2041                    }
2042                }
2043            }
2044            .boxed()
2045        }
2046
2047        fn send(
2048            &mut self,
2049            _id: ClientId,
2050            response: Result<HostResponse, ClientError>,
2051        ) -> BoxFuture<'_, Result<(), ClientError>> {
2052            if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
2053                key, ..
2054            })) = response
2055            {
2056                self.internal_state
2057                    .as_mut()
2058                    .expect("state should be set")
2059                    .owns_contracts
2060                    .insert(key);
2061            }
2062            async { Ok(()) }.boxed()
2063        }
2064    }
2065
2066    pub struct NetworkEventGenerator<R = rand::rngs::SmallRng> {
2067        id: TransportPublicKey,
2068        memory_event_generator: MemoryEventsGen<R>,
2069        ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
2070    }
2071
2072    impl<R> NetworkEventGenerator<R>
2073    where
2074        R: RandomEventGenerator,
2075    {
2076        pub fn new(
2077            id: TransportPublicKey,
2078            memory_event_generator: MemoryEventsGen<R>,
2079            ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
2080        ) -> Self {
2081            Self {
2082                id,
2083                memory_event_generator,
2084                ws_client,
2085            }
2086        }
2087    }
2088
2089    impl<R> ClientEventsProxy for NetworkEventGenerator<R>
2090    where
2091        R: RandomEventGenerator + Send + Clone,
2092    {
2093        fn recv(&mut self) -> BoxFuture<'_, HostIncomingMsg> {
2094            let ws_client_clone = self.ws_client.clone();
2095
2096            async move {
2097                loop {
2098                    let message = {
2099                        let mut lock = ws_client_clone.try_lock().inspect_err(|_| {
2100                            tracing::error!(peer = %self.id, "failed to lock ws client");
2101                        }).inspect(|_| {
2102                            tracing::debug!(peer = %self.id, "locked ws client");
2103                        }).unwrap();
2104                        lock.next().await
2105                    };
2106
2107                    match message {
2108                        Some(Ok(Message::Binary(data))) => {
2109                            if let Ok((id, pub_key)) =
2110                            bincode::deserialize::<(EventId, TransportPublicKey)>(&data)
2111                            {
2112                                tracing::debug!(peer = %self.id, %id, "Received event from the supervisor");
2113                                if pub_key == self.id {
2114                                    let request = self
2115                                        .memory_event_generator
2116                                        .generate_rand_event()
2117                                        .await
2118                                        .ok_or_else(|| {
2119                                            ClientError::from(ErrorKind::Disconnect)
2120                                        })?;
2121                                    return Ok(self
2122                                        .memory_event_generator
2123                                        .build_open_request(request.into()));
2124                                }
2125                            } else {
2126                                continue;
2127                            }
2128                        }
2129                        None => {
2130                            return Err(ClientError::from(ErrorKind::Disconnect));
2131                        }
2132                        _ => continue,
2133                    }
2134                }
2135            }
2136            .boxed()
2137        }
2138
2139        fn send(
2140            &mut self,
2141            _id: ClientId,
2142            response: Result<HostResponse, ClientError>,
2143        ) -> BoxFuture<'_, Result<(), ClientError>> {
2144            if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
2145                key, ..
2146            })) = response
2147            {
2148                self.memory_event_generator
2149                    .internal_state
2150                    .as_mut()
2151                    .expect("state should be set")
2152                    .owns_contracts
2153                    .insert(key);
2154            }
2155            async { Ok(()) }.boxed()
2156        }
2157    }
2158
2159    #[derive(Default)]
2160    pub struct InternalGeneratorState {
2161        this_peer: usize,
2162        num_peers: usize,
2163        current_iteration: usize,
2164        max_iterations: usize,
2165        max_contract_num: usize,
2166        owns_contracts: HashSet<ContractKey>,
2167        existing_contracts: Vec<ContractContainer>,
2168    }
2169
2170    pub trait RandomEventGenerator: Send + 'static {
2171        fn seed_from_u64(seed: u64) -> Self;
2172
2173        fn gen_u8(&mut self) -> u8;
2174
2175        fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize;
2176
2177        fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T>;
2178
2179        fn choose_random_from_iter<'a, T>(
2180            &mut self,
2181            mut iter: impl ExactSizeIterator<Item = &'a T> + 'a,
2182        ) -> Option<&'a T> {
2183            let len = iter.len();
2184            let idx = self.gen_range(0..len);
2185            iter.nth(idx)
2186        }
2187
2188        /// The goal of this function is to generate a random event that is valid for the current
2189        /// global state of the network.
2190        ///
2191        /// In order to do this all peers must replicate the same exact events so they are aware
2192        /// of the current global state (basically which contracts have been created so far).
2193        ///
2194        /// To guarantee this make sure that calls to this rng are always executed in the same order
2195        /// at all peers.
2196        fn gen_event(
2197            &mut self,
2198            state: &mut InternalGeneratorState,
2199        ) -> Option<ClientRequest<'static>> {
2200            while state.current_iteration < state.max_iterations {
2201                state.current_iteration += 1;
2202                let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer;
2203                match self.gen_range(0..100) {
2204                    val if (0..5).contains(&val) => {
2205                        if state.max_contract_num <= state.existing_contracts.len() {
2206                            continue;
2207                        }
2208                        let contract = self.gen_contract_container();
2209                        let request = ContractRequest::Put {
2210                            contract: contract.clone(),
2211                            state: WrappedState::new(self.random_byte_vec()),
2212                            related_contracts: RelatedContracts::new(),
2213                            subscribe: true,
2214                            blocking_subscribe: false,
2215                        };
2216                        state.existing_contracts.push(contract);
2217                        if !for_this_peer {
2218                            continue;
2219                        }
2220                        return Some(request.into());
2221                    }
2222                    val if (5..20).contains(&val) => {
2223                        if let Some(contract) = self.choose(&state.existing_contracts) {
2224                            if !for_this_peer {
2225                                continue;
2226                            }
2227
2228                            let request = ContractRequest::Put {
2229                                contract: contract.clone(),
2230                                state: WrappedState::new(self.random_byte_vec()),
2231                                related_contracts: RelatedContracts::new(),
2232                                // Subscribe to ensure this peer joins the subscription tree
2233                                // and can receive/propagate updates
2234                                subscribe: true,
2235                                blocking_subscribe: false,
2236                            };
2237
2238                            tracing::debug!(
2239                                "sending put to an existing contract with subscribe=true"
2240                            );
2241
2242                            return Some(request.into());
2243                        }
2244                    }
2245                    val if (20..35).contains(&val) => {
2246                        if let Some(contract) = self.choose(&state.existing_contracts) {
2247                            if !for_this_peer {
2248                                continue;
2249                            }
2250                            let key = contract.key();
2251                            let request = ContractRequest::Get {
2252                                key: *key.id(),
2253                                return_contract_code: true,
2254                                subscribe: false,
2255                                blocking_subscribe: false,
2256                            };
2257                            return Some(request.into());
2258                        }
2259                    }
2260                    val if (35..80).contains(&val) => {
2261                        let new_state = UpdateData::State(State::from(self.random_byte_vec()));
2262                        if let Some(contract) = self.choose(&state.existing_contracts) {
2263                            // TODO: It will be used when the delta updates are available
2264                            // let delta = UpdateData::Delta(StateDelta::from(self.random_byte_vec()));
2265                            if !for_this_peer {
2266                                continue;
2267                            }
2268                            let request = ContractRequest::Update {
2269                                key: contract.key(),
2270                                data: new_state,
2271                            };
2272                            if state.owns_contracts.contains(&contract.key()) {
2273                                return Some(request.into());
2274                            }
2275                        }
2276                    }
2277                    val if (80..100).contains(&val) => {
2278                        let summary = StateSummary::from(self.random_byte_vec());
2279
2280                        let Some(from_existing) = self.choose(state.existing_contracts.as_slice())
2281                        else {
2282                            continue;
2283                        };
2284
2285                        let key = from_existing.key();
2286                        if !for_this_peer {
2287                            continue;
2288                        }
2289                        let request = ContractRequest::Subscribe {
2290                            key: *key.id(),
2291                            summary: Some(summary),
2292                        };
2293                        return Some(request.into());
2294                    }
2295                    _ => unreachable!(
2296                        "gen_range(0..100) should always fall into one of the defined ranges"
2297                    ),
2298                }
2299            }
2300            None
2301        }
2302
2303        fn gen_contract_container(&mut self) -> ContractContainer {
2304            let code = ContractCode::from(self.random_byte_vec());
2305            let params = Parameters::from(self.random_byte_vec());
2306            ContractWasmAPIVersion::V1(WrappedContract::new(code.into(), params)).into()
2307        }
2308
2309        fn random_byte_vec(&mut self) -> Vec<u8> {
2310            (0..self.gen_u8())
2311                .map(|_| self.gen_u8())
2312                .collect::<Vec<_>>()
2313        }
2314    }
2315
2316    use rand::prelude::IndexedRandom;
2317
2318    impl RandomEventGenerator for rand::rngs::SmallRng {
2319        fn gen_u8(&mut self) -> u8 {
2320            <Self as rand::Rng>::random(self)
2321        }
2322
2323        fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize {
2324            <Self as rand::Rng>::random_range(self, range)
2325        }
2326
2327        fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T> {
2328            vec.choose(self)
2329        }
2330
2331        fn seed_from_u64(seed: u64) -> Self {
2332            <Self as SeedableRng>::seed_from_u64(seed)
2333        }
2334    }
2335
2336    #[test]
2337    fn test_gen_event() {
2338        const NUM_PEERS: usize = 20;
2339        const ITERATIONS: usize = 10_000;
2340        let mut threads = vec![];
2341        for this_peer in 0..NUM_PEERS {
2342            let thread = std::thread::spawn(move || {
2343                let mut rng = <rand::rngs::SmallRng as RandomEventGenerator>::seed_from_u64(15_978);
2344                let mut state = InternalGeneratorState {
2345                    this_peer,
2346                    num_peers: NUM_PEERS,
2347                    max_contract_num: 10,
2348                    ..Default::default()
2349                };
2350                for _ in 0..ITERATIONS {
2351                    rng.gen_event(&mut state);
2352                }
2353                state
2354            });
2355            threads.push(thread);
2356        }
2357        let states = threads
2358            .into_iter()
2359            .map(|t| t.join().unwrap())
2360            .collect::<Vec<_>>();
2361
2362        let first_state = &states[0];
2363        for state in &states[1..] {
2364            assert_eq!(first_state.existing_contracts, state.existing_contracts);
2365        }
2366    }
2367}