1use either::Either;
4use freenet_stdlib::{
5 client_api::{
6 ClientError, ClientRequest, ContractRequest, ContractResponse, ErrorKind, HostResponse,
7 QueryResponse,
8 },
9 prelude::*,
10};
11use futures::stream::FuturesUnordered;
12use futures::{future::BoxFuture, FutureExt, StreamExt};
13use std::cell::Cell;
14use std::fmt::Display;
15use std::sync::Arc;
16use std::{convert::Infallible, fmt::Debug};
17use tracing::Instrument;
18
19use serde::{Deserialize, Serialize};
20use tokio::sync::mpsc::{self, UnboundedSender};
21
22use crate::contract::{ClientResponsesReceiver, ContractHandlerEvent};
23use crate::message::{NodeEvent, QueryResult};
24use crate::node::OpManager;
25use crate::operations::{get, put, update, OpError, VisitedPeers};
26use crate::ring::KnownPeerKeyLocation;
27use crate::tracing::NetEventLog;
28use crate::{
29 config::{GlobalExecutor, GlobalRng},
30 contract::StoreResponse,
31};
32
33pub(crate) mod combinator;
35#[cfg(test)]
36mod integration_verification;
37pub(crate) mod result_router;
38pub(crate) mod session_actor;
39#[cfg(test)]
40mod test_correlation;
41#[cfg(feature = "websocket")]
42pub(crate) mod websocket;
43
44pub(crate) type BoxedClient = Box<dyn ClientEventsProxy + Send + 'static>;
45pub type HostResult = Result<HostResponse, ClientError>;
46
47#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
49#[repr(transparent)]
50pub struct RequestId(u64);
51
52const COUNTER_BLOCK: u64 = 1_000_000;
53
54thread_local! {
55 static REQUEST_ID_COUNTER: Cell<u64> = {
56 let idx = crate::config::GlobalRng::thread_index();
57 Cell::new(1 + idx * COUNTER_BLOCK)
58 };
59}
60
61impl RequestId {
62 pub fn new() -> Self {
63 Self(REQUEST_ID_COUNTER.with(|c| {
64 let v = c.get();
65 c.set(v + 1);
66 v
67 }))
68 }
69
70 pub fn reset_counter() {
73 let idx = crate::config::GlobalRng::thread_index();
74 REQUEST_ID_COUNTER.with(|c| c.set(1 + idx * COUNTER_BLOCK));
75 }
76}
77
78impl Default for RequestId {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl std::fmt::Display for RequestId {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "req-{}", self.0)
87 }
88}
89
90#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
91#[repr(transparent)]
92pub struct ClientId(usize);
93
94impl From<ClientId> for usize {
95 fn from(val: ClientId) -> Self {
96 val.0
97 }
98}
99
100thread_local! {
101 static CLIENT_ID_COUNTER: Cell<usize> = {
102 let idx = crate::config::GlobalRng::thread_index();
103 Cell::new(1 + (idx as usize) * (COUNTER_BLOCK as usize))
104 };
105}
106
107impl ClientId {
108 pub const FIRST: Self = ClientId(0);
109
110 pub fn next() -> Self {
111 ClientId(CLIENT_ID_COUNTER.with(|c| {
112 let v = c.get();
113 c.set(v + 1);
114 v
115 }))
116 }
117
118 pub fn reset_counter() {
121 let idx = crate::config::GlobalRng::thread_index();
122 CLIENT_ID_COUNTER.with(|c| c.set(1 + (idx as usize) * (COUNTER_BLOCK as usize)));
123 }
124}
125
126impl Display for ClientId {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 write!(f, "{}", self.0)
129 }
130}
131
132type HostIncomingMsg = Result<OpenRequest<'static>, ClientError>;
133
134#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
135pub struct AuthToken(#[serde(deserialize_with = "AuthToken::deser_auth_token")] Arc<str>);
136
137impl AuthToken {
138 pub fn as_str(&self) -> &str {
139 &self.0
140 }
141
142 pub fn generate() -> AuthToken {
143 let mut token = [0u8; 32];
144 GlobalRng::fill_bytes(&mut token);
145 let token_str = bs58::encode(token).into_string();
146 AuthToken::from(token_str)
147 }
148}
149
150impl std::ops::Deref for AuthToken {
151 type Target = str;
152
153 fn deref(&self) -> &Self::Target {
154 &self.0
155 }
156}
157
158impl AuthToken {
159 fn deser_auth_token<'de, D>(deser: D) -> Result<Arc<str>, D::Error>
160 where
161 D: serde::Deserializer<'de>,
162 {
163 let value = <String as Deserialize>::deserialize(deser)?;
164 Ok(value.into())
165 }
166}
167
168impl From<String> for AuthToken {
169 fn from(value: String) -> Self {
170 Self(value.into())
171 }
172}
173
174#[non_exhaustive]
175pub struct OpenRequest<'a> {
176 pub client_id: ClientId,
177 pub request_id: RequestId,
178 pub request: Box<ClientRequest<'a>>,
179 pub notification_channel: Option<UnboundedSender<HostResult>>,
180 pub token: Option<AuthToken>,
181 pub attested_contract: Option<ContractInstanceId>,
182}
183
184impl Display for OpenRequest<'_> {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(
187 f,
188 "client request {{ client: {}, request_id: {}, req: {} }}",
189 &self.client_id, &self.request_id, &*self.request
190 )
191 }
192}
193
194impl<'a> OpenRequest<'a> {
195 pub fn into_owned(self) -> OpenRequest<'static> {
196 OpenRequest {
197 request: Box::new(self.request.into_owned()),
198 ..self
199 }
200 }
201
202 pub fn new(id: ClientId, request: Box<ClientRequest<'a>>) -> Self {
203 Self {
204 client_id: id,
205 request_id: RequestId::new(),
206 request,
207 notification_channel: None,
208 token: None,
209 attested_contract: None,
210 }
211 }
212
213 pub fn with_notification(mut self, ch: UnboundedSender<HostResult>) -> Self {
214 self.notification_channel = Some(ch);
215 self
216 }
217
218 pub fn with_token(mut self, token: Option<AuthToken>) -> Self {
219 self.token = token;
220 self
221 }
222
223 pub fn with_attested_contract(mut self, contract: Option<ContractInstanceId>) -> Self {
224 self.attested_contract = contract;
225 self
226 }
227}
228
229pub trait ClientEventsProxy {
230 fn recv(&mut self) -> BoxFuture<'_, HostIncomingMsg>;
233
234 fn send(
236 &mut self,
237 id: ClientId,
238 response: Result<HostResponse, ClientError>,
239 ) -> BoxFuture<'_, Result<(), ClientError>>;
240}
241
242async fn register_subscription_listener(
244 op_manager: &OpManager,
245 instance_id: ContractInstanceId,
246 client_id: ClientId,
247 subscription_listener: UnboundedSender<HostResult>,
248 operation_type: &str,
249) -> Result<(), Error> {
250 tracing::debug!(
251 client_id = %client_id,
252 contract = %instance_id,
253 operation = operation_type,
254 "Registering subscription listener"
255 );
256 let register_listener = op_manager
257 .notify_contract_handler(ContractHandlerEvent::RegisterSubscriberListener {
258 key: instance_id,
259 client_id,
260 summary: None, subscriber_listener: subscription_listener,
262 })
263 .await
264 .inspect_err(|err| {
265 tracing::error!(
266 client_id = %client_id,
267 contract = %instance_id,
268 operation = operation_type,
269 error = %err,
270 "Register subscriber listener failed"
271 );
272 });
273 match register_listener {
274 Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => {
275 tracing::debug!(
276 client_id = %client_id,
277 contract = %instance_id,
278 operation = operation_type,
279 "Subscriber listener registered successfully"
280 );
281 let result = op_manager
283 .ring
284 .add_client_subscription(&instance_id, client_id);
285 if result.is_first_client {
287 if let Some(event) = NetEventLog::seeding_started(&op_manager.ring, instance_id) {
288 op_manager.ring.register_events(Either::Left(event)).await;
289 }
290 }
291 Ok(())
292 }
293 _ => {
294 tracing::error!(
295 client_id = %client_id,
296 contract = %instance_id,
297 operation = operation_type,
298 phase = "registration_failed",
299 "Subscriber listener registration failed"
300 );
301 Err(Error::Op(OpError::UnexpectedOpState))
302 }
303 }
304}
305
306async fn report_op_init_error(
308 op_manager: &OpManager,
309 tx: crate::message::Transaction,
310 contract: &impl std::fmt::Display,
311 op_name: &str,
312 err: &OpError,
313 client_id: ClientId,
314 request_id: RequestId,
315) {
316 tracing::error!(
317 client_id = %client_id,
318 request_id = %request_id,
319 tx = %tx,
320 contract = %contract,
321 error = %err,
322 phase = "error",
323 "{op_name} request failed"
324 );
325
326 let error_kind = match err {
330 OpError::RingError(crate::ring::RingError::EmptyRing) => ErrorKind::EmptyRing,
331 OpError::RingError(crate::ring::RingError::PeerNotJoined) => ErrorKind::PeerNotJoined,
332 OpError::RingError(crate::ring::RingError::ConnError(_))
333 | OpError::RingError(crate::ring::RingError::NoCachingPeers(_))
334 | OpError::ConnError(_)
335 | OpError::ContractError(_)
336 | OpError::ExecutorError(_)
337 | OpError::UnexpectedOpState
338 | OpError::InvalidStateTransition { .. }
339 | OpError::NotificationError
340 | OpError::NotificationChannelError(_)
341 | OpError::IncorrectTxType(..)
342 | OpError::OpNotPresent(_)
343 | OpError::OpNotAvailable(_)
344 | OpError::StreamCancelled
345 | OpError::OrphanStreamClaimFailed
346 | OpError::StatePushed => ErrorKind::OperationError {
347 cause: format!("{op_name} operation failed: {err}").into(),
348 },
349 };
350
351 let error_response = Err(error_kind.into());
352
353 if let Err(e) = op_manager.result_router_tx.send((tx, error_response)).await {
354 tracing::error!(
355 tx = %tx,
356 error = %e,
357 "Failed to send {op_name} error to result router"
358 );
359 }
360
361 op_manager.completed(tx);
366}
367
368pub async fn client_event_handling<ClientEv>(
378 op_manager: Arc<OpManager>,
379 mut client_events: ClientEv,
380 mut client_responses: ClientResponsesReceiver,
381 node_controller: tokio::sync::mpsc::Sender<NodeEvent>,
382) -> anyhow::Result<Infallible>
383where
384 ClientEv: ClientEventsProxy + Send + 'static,
385{
386 let request_router = std::sync::Arc::new(crate::node::RequestRouter::new());
387 op_manager.set_request_router(request_router.clone());
390 let request_router = Some(request_router);
391 let mut results = FuturesUnordered::new();
392 loop {
393 crate::deterministic_select! {
395 client_request = client_events.recv() => {
396 let req = match client_request {
397 Ok(request) => {
398 tracing::debug!(
399 client_id = %request.client_id,
400 request_id = %request.request_id,
401 request_type = ?request.request,
402 "Received client request"
403 );
404 request
405 }
406 Err(error) if matches!(error.kind(), ErrorKind::Shutdown) => {
407 node_controller.send(NodeEvent::Disconnect { cause: None }).await.ok();
408 anyhow::bail!("shutdown event");
409 }
410 Err(error) if matches!(error.kind(), ErrorKind::TransportProtocolDisconnect) => {
411 tracing::debug!(error = %error, "Client transport disconnected");
414 continue;
415 }
416 Err(error) => {
417 tracing::debug!(error = %error, "Client error");
418 continue;
419 }
420 };
421 let cli_id = req.client_id;
422 let res = process_open_request(req, op_manager.clone(), request_router.clone()).await;
423 results.push(async move {
424 match res.await {
425 Ok(Some(Either::Left(res))) => (cli_id, Ok(Some(res))),
426 Ok(Some(Either::Right(mut cb))) => {
427 match cb.recv().await {
428 Some(res) => (cli_id, Ok(Some(res))),
429 None => (cli_id, Err(ClientError::from(ErrorKind::ChannelClosed))),
430 }
431 }
432 Ok(None) => (cli_id, Ok(None)),
433 Err(Error::Disconnected) => {
434 tracing::debug!(client_id = %cli_id, "Client disconnected");
435 (cli_id, Err(ClientError::from(ErrorKind::Disconnect)))
436 }
437 Err(Error::PeerNotJoined) => {
438 tracing::warn!(
439 client_id = %cli_id,
440 "Operation rejected: peer has not joined network yet - client should retry after join"
441 );
442 (cli_id, Err(ErrorKind::PeerNotJoined.into()))
443 }
444 Err(Error::EmptyRing) => {
445 tracing::warn!(
446 client_id = %cli_id,
447 "Operation rejected: no ring connections found - client should retry after connections are established"
448 );
449 (cli_id, Err(ErrorKind::EmptyRing.into()))
450 }
451 Err(err) => {
452 tracing::error!(
453 client_id = %cli_id,
454 error = %err,
455 "Operation error"
456 );
457 (cli_id, Err(ErrorKind::OperationError { cause: format!("{err}").into() }.into()))
458 }
459 }
460 });
461 },
462 res = client_responses.recv() => {
463 if let Some((cli_id, request_id, res)) = res {
464 if let Ok(result) = &res {
465 tracing::debug!(
466 client_id = %cli_id,
467 request_id = %request_id,
468 response = %result,
469 "Sending client response"
470 );
471 }
472 if let Err(err) = client_events.send(cli_id, res).await {
473 tracing::debug!(
474 client_id = %cli_id,
475 error = %err,
476 "Client channel closed, response dropped"
477 );
478 }
479 }
480 },
481 res = results.next(), if !results.is_empty() => {
482 let Some(f_res) = res else {
483 unreachable!("results.next() should only return None if results is empty, which is guarded against");
484 };
485 match f_res {
486 (cli_id, Ok(Some(res))) => {
487 let res = match res {
488 QueryResult::Connections(conns) => {
489 Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers {
491 peers: conns.into_iter().filter_map(|p| {
492 KnownPeerKeyLocation::try_from(&p).ok().map(|known| {
493 (p.pub_key.to_string(), known.socket_addr())
494 })
495 }).collect() }
496 ))
497 }
498 QueryResult::GetResult { key, state, contract } => {
499 Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
500 key,
501 state,
502 contract,
503 }))
504 }
505 QueryResult::DelegateResult { response, .. } => {
506 response
507 }
508 QueryResult::NetworkDebug(debug_info) => {
509 let subscriptions = debug_info.application_subscriptions.into_iter().map(|sub| {
511 freenet_stdlib::client_api::SubscriptionInfo {
512 contract_key: sub.instance_id,
513 client_id: sub.client_id.into(),
514 }
515 }).collect();
516
517 let connected_peers = debug_info.connected_peers.into_iter().filter_map(|peer| {
519 KnownPeerKeyLocation::try_from(&peer).ok().map(|known| {
520 (peer.to_string(), known.socket_addr())
521 })
522 }).collect();
523
524 Ok(HostResponse::QueryResponse(QueryResponse::NetworkDebug(
525 freenet_stdlib::client_api::NetworkDebugInfo {
526 subscriptions,
527 connected_peers,
528 }
529 )))
530 }
531 QueryResult::NodeDiagnostics(response) => {
532 Ok(HostResponse::QueryResponse(QueryResponse::NodeDiagnostics(response)))
533 }
534 };
535 if let Ok(result) = &res {
536 tracing::debug!(
537 client_id = %cli_id,
538 response = %result,
539 "Sending client operation response"
540 );
541 }
542 if let Err(err) = client_events.send(cli_id, res).await {
543 tracing::debug!(
544 client_id = %cli_id,
545 error = %err,
546 "Client channel closed, operation response dropped"
547 );
548 }
549 }
550 (_, Ok(None)) => continue,
551 (cli_id, Err(err)) => {
552 tracing::error!(
553 client_id = %cli_id,
554 error = %err,
555 "Sending error response to client"
556 );
557 if let Err(send_err) = client_events.send(cli_id, Err(err)).await {
558 tracing::debug!(
559 client_id = %cli_id,
560 error = %send_err,
561 "Client channel closed, error response dropped"
562 );
563 }
564 }
565 }
566 },
567 }
568 }
569}
570
571#[derive(Debug, thiserror::Error)]
572enum Error {
573 #[error("Node not connected to network")]
574 Disconnected,
575 #[error("Peer has not joined the network yet (no ring location established)")]
576 PeerNotJoined,
577 #[error("No ring connections found")]
578 EmptyRing,
579 #[error("Node error: {0}")]
580 Node(String),
581 #[error(transparent)]
582 Contract(#[from] crate::contract::ContractError),
583 #[error(transparent)]
584 Op(#[from] OpError),
585 #[error(transparent)]
586 Executor(#[from] crate::contract::ExecutorError),
587 #[error(transparent)]
588 Panic(#[from] tokio::task::JoinError),
589}
590
591impl From<crate::ring::RingError> for Error {
592 fn from(err: crate::ring::RingError) -> Self {
593 match err {
594 crate::ring::RingError::PeerNotJoined => Error::PeerNotJoined,
595 crate::ring::RingError::EmptyRing => Error::EmptyRing,
596 other @ crate::ring::RingError::ConnError(_)
597 | other @ crate::ring::RingError::NoCachingPeers(_) => Error::Node(other.to_string()),
598 }
599 }
600}
601
602fn ensure_peer_ready(op_manager: &OpManager) -> Result<std::net::SocketAddr, Error> {
606 if !op_manager.is_gateway
607 && !op_manager
608 .peer_ready
609 .load(std::sync::atomic::Ordering::SeqCst)
610 {
611 return Err(Error::PeerNotJoined);
612 }
613 Ok(op_manager.ring.connection_manager.peer_addr()?)
614}
615
616#[inline]
617async fn process_open_request(
618 mut request: OpenRequest<'static>,
619 op_manager: Arc<OpManager>,
620 request_router: Option<Arc<crate::node::RequestRouter>>,
621) -> BoxFuture<'static, Result<Option<Either<QueryResult, mpsc::Receiver<QueryResult>>>, Error>> {
622 let (callback_tx, callback_rx) = if matches!(
623 &*request.request,
624 ClientRequest::NodeQueries(_) | ClientRequest::ContractOp(ContractRequest::Get { .. })
625 ) {
626 let (tx, rx) = mpsc::channel(1);
627 (Some(tx), Some(rx))
628 } else {
629 (None, None)
630 };
631
632 let fut = async move {
635 let client_id = request.client_id;
636 let request_id = request.request_id;
637
638 let subscription_listener: Option<UnboundedSender<HostResult>> =
639 request.notification_channel.take();
640
641 match *request.request {
642 ClientRequest::ContractOp(ops) => {
643 match ops {
644 ContractRequest::Put {
645 state,
646 contract,
647 related_contracts,
648 subscribe,
649 blocking_subscribe,
650 } => {
651 let peer_id = ensure_peer_ready(&op_manager)?;
652
653 tracing::debug!(
654 client_id = %client_id,
655 request_id = %request_id,
656 peer = %peer_id,
657 phase = "request",
658 "Received PUT request from client"
659 );
660
661 let contract_key = contract.key();
662
663 let own_location = op_manager.ring.connection_manager.own_location();
667 let skip_list: Vec<_> = own_location.socket_addr().into_iter().collect();
668 let has_remote_peers = op_manager
669 .ring
670 .closest_potentially_caching(&contract_key, skip_list.as_slice())
671 .is_some();
672
673 if !has_remote_peers {
674 tracing::debug!(
676 client_id = %client_id,
677 request_id = %request_id,
678 peer = %peer_id,
679 contract = %contract_key,
680 phase = "local_only",
681 "PUT will complete locally (no remote peers)"
682 );
683
684 let op = put::start_op(
687 contract.clone(),
688 related_contracts.clone(),
689 state.clone(),
690 op_manager.ring.max_hops_to_live,
691 subscribe,
692 blocking_subscribe,
693 );
694 let op_id = op.id;
695
696 op_manager
697 .ch_outbound
698 .waiting_for_transaction_result(op_id, client_id, request_id)
699 .await
700 .inspect_err(|err| {
701 tracing::error!(
702 client_id = %client_id,
703 request_id = %request_id,
704 tx = %op_id,
705 error = %err,
706 "Error waiting for transaction result"
707 )
708 })?;
709
710 if let Err(err) = put::request_put(&op_manager, op).await {
711 report_op_init_error(
712 &op_manager,
713 op_id,
714 &contract_key,
715 "PUT",
716 &err,
717 client_id,
718 request_id,
719 )
720 .await;
721 } else if subscribe {
722 if let Some(sl) = subscription_listener {
723 register_subscription_listener(
724 &op_manager,
725 *contract_key.id(),
726 client_id,
727 sl,
728 "PUT",
729 )
730 .await?;
731 } else {
732 tracing::warn!(
733 client_id = %client_id,
734 contract = %contract_key,
735 "PUT with subscribe=true but no subscription_listener"
736 );
737 }
738 }
739 } else if let Some(router) = &request_router {
740 tracing::debug!(
741 client_id = %client_id,
742 request_id = %request_id,
743 peer = %peer_id,
744 contract = %contract_key,
745 phase = "routing",
746 "Routing PUT request through deduplication layer"
747 );
748
749 let request = crate::node::DeduplicatedRequest::Put {
750 key: contract_key,
751 contract: contract.clone(),
752 related_contracts: related_contracts.clone(),
753 state: state.clone(),
754 subscribe,
755 blocking_subscribe,
756 client_id,
757 request_id,
758 };
759
760 let (transaction_id, should_start_operation) =
761 router.route_request(request).await.map_err(|e| {
762 Error::Node(format!("Request routing failed: {}", e))
763 })?;
764
765 op_manager
766 .ch_outbound
767 .waiting_for_transaction_result(
768 transaction_id,
769 client_id,
770 request_id,
771 )
772 .await
773 .inspect_err(|err| {
774 tracing::error!(
775 "Error waiting for transaction result: {}",
776 err
777 );
778 })?;
779
780 if should_start_operation {
781 tracing::debug!(
782 client_id = %client_id,
783 request_id = %request_id,
784 tx = %transaction_id,
785 peer = %peer_id,
786 contract = %contract_key,
787 phase = "new_operation",
788 "Starting new PUT network operation"
789 );
790
791 let op = put::start_op_with_id(
792 contract.clone(),
793 related_contracts.clone(),
794 state.clone(),
795 op_manager.ring.max_hops_to_live,
796 subscribe,
797 blocking_subscribe,
798 transaction_id,
799 );
800
801 if let Err(err) = put::request_put(&op_manager, op).await {
802 report_op_init_error(
803 &op_manager,
804 transaction_id,
805 &contract_key,
806 "PUT",
807 &err,
808 client_id,
809 request_id,
810 )
811 .await;
812 } else if subscribe {
813 if let Some(sl) = subscription_listener {
814 register_subscription_listener(
815 &op_manager,
816 *contract_key.id(),
817 client_id,
818 sl,
819 "PUT",
820 )
821 .await?;
822 } else {
823 tracing::warn!(
824 client_id = %client_id,
825 contract = %contract_key,
826 "PUT with subscribe=true but no subscription_listener"
827 );
828 }
829 }
830 } else {
831 tracing::debug!(
832 client_id = %client_id,
833 request_id = %request_id,
834 tx = %transaction_id,
835 peer = %peer_id,
836 contract = %contract_key,
837 phase = "reuse",
838 "Reusing existing PUT operation - client registered for result"
839 );
840 if subscribe {
841 if let Some(sl) = subscription_listener {
842 register_subscription_listener(
843 &op_manager,
844 *contract_key.id(),
845 client_id,
846 sl,
847 "PUT",
848 )
849 .await?;
850 } else {
851 tracing::warn!(
852 client_id = %client_id,
853 contract = %contract_key,
854 "PUT with subscribe=true but no subscription_listener"
855 );
856 }
857 }
858 }
859 } else {
860 tracing::debug!(
861 client_id = %client_id,
862 request_id = %request_id,
863 peer = %peer_id,
864 contract = %contract_key,
865 phase = "legacy",
866 "Starting direct PUT operation (legacy mode)"
867 );
868
869 let op = put::start_op(
870 contract.clone(),
871 related_contracts.clone(),
872 state.clone(),
873 op_manager.ring.max_hops_to_live,
874 subscribe,
875 blocking_subscribe,
876 );
877 let op_id = op.id;
878
879 op_manager
880 .ch_outbound
881 .waiting_for_transaction_result(op_id, client_id, request_id)
882 .await
883 .inspect_err(|err| {
884 tracing::error!(
885 client_id = %client_id,
886 request_id = %request_id,
887 tx = %op_id,
888 error = %err,
889 "Error waiting for transaction result"
890 )
891 })?;
892
893 if let Err(err) = put::request_put(&op_manager, op).await {
894 report_op_init_error(
895 &op_manager,
896 op_id,
897 &contract_key,
898 "PUT",
899 &err,
900 client_id,
901 request_id,
902 )
903 .await;
904 } else if subscribe {
905 if let Some(sl) = subscription_listener {
906 register_subscription_listener(
907 &op_manager,
908 *contract_key.id(),
909 client_id,
910 sl,
911 "PUT",
912 )
913 .await?;
914 } else {
915 tracing::warn!(
916 client_id = %client_id,
917 contract = %contract_key,
918 "PUT with subscribe=true but no subscription_listener"
919 );
920 }
921 }
922 }
923 }
924 ContractRequest::Update { key, data } => {
925 let peer_id = ensure_peer_ready(&op_manager)?;
926
927 tracing::debug!(
928 client_id = %client_id,
929 request_id = %request_id,
930 peer = %peer_id,
931 contract = %key,
932 phase = "request",
933 "Received UPDATE request from client"
934 );
935
936 let related_contracts = RelatedContracts::default();
937
938 tracing::debug!(
939 client_id = %client_id,
940 request_id = %request_id,
941 peer = %peer_id,
942 contract = %key,
943 data = ?data,
944 phase = "starting",
945 "Starting UPDATE operation - passing delta to network layer"
946 );
947
948 let update_data: UpdateData<'static> = match data {
951 UpdateData::State(s) => UpdateData::State(State::from(s.into_bytes())),
952 UpdateData::Delta(d) => {
953 UpdateData::Delta(StateDelta::from(d.into_bytes()))
954 }
955 UpdateData::StateAndDelta { state, delta } => {
956 UpdateData::StateAndDelta {
957 state: State::from(state.into_bytes()),
958 delta: StateDelta::from(delta.into_bytes()),
959 }
960 }
961 UpdateData::RelatedState { related_to, state } => {
962 UpdateData::RelatedState {
963 related_to,
964 state: State::from(state.into_bytes()),
965 }
966 }
967 UpdateData::RelatedDelta { related_to, delta } => {
968 UpdateData::RelatedDelta {
969 related_to,
970 delta: StateDelta::from(delta.into_bytes()),
971 }
972 }
973 UpdateData::RelatedStateAndDelta {
974 related_to,
975 state,
976 delta,
977 } => UpdateData::RelatedStateAndDelta {
978 related_to,
979 state: State::from(state.into_bytes()),
980 delta: StateDelta::from(delta.into_bytes()),
981 },
982 };
983
984 tracing::debug!(
985 client_id = %client_id,
986 request_id = %request_id,
987 peer = %peer_id,
988 contract = %key,
989 phase = "sending",
990 "Sending UPDATE operation to network layer"
991 );
992
993 if let Some(router) = &request_router {
994 tracing::debug!(
995 client_id = %client_id,
996 request_id = %request_id,
997 peer = %peer_id,
998 contract = %key,
999 phase = "routing",
1000 "Routing UPDATE request through deduplication layer"
1001 );
1002
1003 let request = crate::node::DeduplicatedRequest::Update {
1004 key,
1005 update_data: update_data.clone(),
1006 related_contracts: related_contracts.clone(),
1007 client_id,
1008 request_id,
1009 };
1010
1011 let (transaction_id, should_start_operation) =
1012 router.route_request(request).await.map_err(|e| {
1013 Error::Node(format!("Request routing failed: {}", e))
1014 })?;
1015
1016 op_manager
1018 .ch_outbound
1019 .waiting_for_transaction_result(
1020 transaction_id,
1021 client_id,
1022 request_id,
1023 )
1024 .await
1025 .inspect_err(|err| {
1026 tracing::error!(
1027 "Error waiting for transaction result: {}",
1028 err
1029 );
1030 })?;
1031
1032 if should_start_operation {
1034 tracing::debug!(
1035 client_id = %client_id,
1036 request_id = %request_id,
1037 tx = %transaction_id,
1038 peer = %peer_id,
1039 contract = %key,
1040 phase = "new_operation",
1041 "Starting new UPDATE network operation"
1042 );
1043
1044 let op = update::start_op_with_id(
1045 key,
1046 update_data.clone(),
1047 related_contracts,
1048 transaction_id,
1049 );
1050
1051 tracing::debug!(
1052 request_id = %request_id,
1053 transaction_id = %op.id,
1054 operation = "update",
1055 "Request-Transaction correlation"
1056 );
1057
1058 match update::request_update(&op_manager, op).await {
1059 Ok(()) | Err(OpError::StatePushed) => {}
1060 Err(err) => {
1061 report_op_init_error(
1062 &op_manager,
1063 transaction_id,
1064 &key,
1065 "UPDATE",
1066 &err,
1067 client_id,
1068 request_id,
1069 )
1070 .await;
1071 }
1072 }
1073 } else {
1074 tracing::debug!(
1075 client_id = %client_id,
1076 request_id = %request_id,
1077 tx = %transaction_id,
1078 peer = %peer_id,
1079 contract = %key,
1080 phase = "reuse",
1081 "Reusing existing UPDATE operation - client registered for result"
1082 );
1083 }
1084 } else {
1085 tracing::debug!(
1086 client_id = %client_id,
1087 request_id = %request_id,
1088 peer = %peer_id,
1089 contract = %key,
1090 phase = "legacy",
1091 "Starting direct UPDATE operation (legacy mode)"
1092 );
1093
1094 let op = update::start_op(key, update_data, related_contracts);
1096 let op_id = op.id;
1097
1098 tracing::debug!(
1099 request_id = %request_id,
1100 transaction_id = %op_id,
1101 operation = "update",
1102 "Request-Transaction correlation"
1103 );
1104
1105 op_manager
1106 .ch_outbound
1107 .waiting_for_transaction_result(op_id, client_id, request_id)
1108 .await
1109 .inspect_err(|err| {
1110 tracing::error!(
1111 "Error waiting for transaction result: {}",
1112 err
1113 );
1114 })?;
1115
1116 if let Err(err) = update::request_update(&op_manager, op).await {
1117 report_op_init_error(
1118 &op_manager,
1119 op_id,
1120 &key,
1121 "UPDATE",
1122 &err,
1123 client_id,
1124 request_id,
1125 )
1126 .await;
1127 }
1128 }
1129 }
1130 ContractRequest::Get {
1131 key,
1132 return_contract_code,
1133 subscribe,
1134 blocking_subscribe,
1135 } => {
1136 let peer_id = ensure_peer_ready(&op_manager)?;
1137
1138 let (full_key, state, contract) = match op_manager
1146 .notify_contract_handler(ContractHandlerEvent::GetQuery {
1147 instance_id: key,
1148 return_contract_code,
1149 })
1150 .await
1151 {
1152 Ok(ContractHandlerEvent::GetResponse {
1153 key: Some(full_key),
1154 response: Ok(StoreResponse { state, contract }),
1155 }) => (Some(full_key), state, contract),
1156 Ok(ContractHandlerEvent::GetResponse {
1157 key: None,
1158 response:
1159 Ok(StoreResponse {
1160 state: None,
1161 contract: None,
1162 }),
1163 }) => (None, None, None), Ok(ContractHandlerEvent::GetResponse {
1165 response: Err(err), ..
1166 }) => {
1167 tracing::error!(
1168 client_id = %client_id,
1169 request_id = %request_id,
1170 %key,
1171 error = %err,
1172 phase = "error",
1173 "GET query failed (executor error)"
1174 );
1175 return Err(Error::Executor(err));
1176 }
1177 Err(err) => {
1178 tracing::error!(
1179 client_id = %client_id,
1180 request_id = %request_id,
1181 %key,
1182 error = %err,
1183 phase = "error",
1184 "GET query failed (contract error)"
1185 );
1186 return Err(Error::Contract(err));
1187 }
1188 Ok(_) => {
1189 tracing::error!(
1190 client_id = %client_id,
1191 request_id = %request_id,
1192 %key,
1193 phase = "error",
1194 "GET query failed (unexpected state)"
1195 );
1196 return Err(Error::Op(OpError::UnexpectedOpState));
1197 }
1198 };
1199
1200 let connection_count = op_manager.ring.open_connections();
1209 let has_local_state = full_key.is_some() && state.is_some();
1210 let local_satisfies_request =
1211 has_local_state && (!return_contract_code || contract.is_some());
1212
1213 let is_subscribed = full_key
1217 .as_ref()
1218 .map(|k| op_manager.ring.should_seed(k))
1219 .unwrap_or(false);
1220
1221 if local_satisfies_request && (connection_count == 0 || is_subscribed) {
1225 let full_key = full_key.unwrap();
1226 let state = state.unwrap();
1227
1228 tracing::debug!(
1229 client_id = %client_id,
1230 request_id = %request_id,
1231 peer = %peer_id,
1232 contract = %full_key,
1233 is_subscribed,
1234 connection_count,
1235 phase = "local_cache",
1236 "Returning locally cached contract state (subscribed or isolated)"
1237 );
1238
1239 if subscribe {
1241 if let Some(subscription_listener) = subscription_listener {
1242 register_subscription_listener(
1243 &op_manager,
1244 *full_key.id(),
1245 client_id,
1246 subscription_listener,
1247 "local GET",
1248 )
1249 .await?;
1250 } else {
1251 tracing::warn!(
1252 client_id = %client_id,
1253 contract = %full_key,
1254 "GET with subscribe=true but no subscription_listener"
1255 );
1256 }
1257 }
1258
1259 return Ok(Some(Either::Left(QueryResult::GetResult {
1260 key: full_key,
1261 state,
1262 contract,
1263 })));
1264 }
1265
1266 if let Some(router) = &request_router {
1270 tracing::debug!(
1271 client_id = %client_id,
1272 request_id = %request_id,
1273 peer = %peer_id,
1274 contract = %key,
1275 has_local = has_local_state,
1276 is_subscribed,
1277 connection_count,
1278 phase = "network_routing",
1279 "Routing GET request through network (not subscribed or no local cache)"
1280 );
1281
1282 let request = crate::node::DeduplicatedRequest::Get {
1283 key,
1284 return_contract_code,
1285 subscribe,
1286 blocking_subscribe,
1287 client_id,
1288 request_id,
1289 };
1290
1291 let (transaction_id, should_start_operation) =
1292 router.route_request(request).await.map_err(|e| {
1293 Error::Node(format!("Request routing failed: {}", e))
1294 })?;
1295
1296 op_manager
1298 .ch_outbound
1299 .waiting_for_transaction_result(
1300 transaction_id,
1301 client_id,
1302 request_id,
1303 )
1304 .await
1305 .inspect_err(|err| {
1306 tracing::error!(
1307 "Error waiting for transaction result (get): {}",
1308 err
1309 );
1310 })?;
1311
1312 if should_start_operation {
1314 tracing::debug!(
1315 client_id = %client_id,
1316 request_id = %request_id,
1317 tx = %transaction_id,
1318 peer = %peer_id,
1319 contract = %key,
1320 phase = "new_operation",
1321 "Starting new GET network operation"
1322 );
1323
1324 let op = get::start_op_with_id(
1325 key,
1326 return_contract_code,
1327 subscribe,
1328 blocking_subscribe,
1329 transaction_id,
1330 );
1331
1332 if let Err(err) = get::request_get(
1333 &op_manager,
1334 op,
1335 VisitedPeers::new(&transaction_id),
1336 )
1337 .await
1338 {
1339 report_op_init_error(
1340 &op_manager,
1341 transaction_id,
1342 &key,
1343 "GET",
1344 &err,
1345 client_id,
1346 request_id,
1347 )
1348 .await;
1349 } else if subscribe {
1350 if let Some(sl) = subscription_listener {
1351 register_subscription_listener(
1352 &op_manager,
1353 key,
1354 client_id,
1355 sl,
1356 "GET",
1357 )
1358 .await?;
1359 } else {
1360 tracing::warn!(
1361 client_id = %client_id,
1362 contract = %key,
1363 "GET with subscribe=true but no subscription_listener"
1364 );
1365 }
1366 }
1367 } else {
1368 tracing::debug!(
1369 client_id = %client_id,
1370 request_id = %request_id,
1371 tx = %transaction_id,
1372 peer = %peer_id,
1373 contract = %key,
1374 phase = "reuse",
1375 "Reusing existing GET operation - client registered for result"
1376 );
1377
1378 if subscribe {
1380 if let Some(subscription_listener) = subscription_listener {
1381 register_subscription_listener(
1382 &op_manager,
1383 key,
1384 client_id,
1385 subscription_listener,
1386 "network GET",
1387 )
1388 .await?;
1389 } else {
1390 tracing::warn!(
1391 client_id = %client_id,
1392 contract = %key,
1393 "GET with subscribe=true but no subscription_listener"
1394 );
1395 }
1396 }
1397 }
1398 } else {
1399 tracing::debug!(
1400 client_id = %client_id,
1401 request_id = %request_id,
1402 peer = %peer_id,
1403 contract = %key,
1404 phase = "legacy",
1405 "Contract not found locally, starting direct GET operation (legacy mode)"
1406 );
1407
1408 let op = get::start_op(
1410 key,
1411 return_contract_code,
1412 subscribe,
1413 blocking_subscribe,
1414 );
1415 let op_id = op.id;
1416
1417 op_manager
1418 .ch_outbound
1419 .waiting_for_transaction_result(op_id, client_id, request_id)
1420 .await
1421 .inspect_err(|err| {
1422 tracing::error!(
1423 client_id = %client_id,
1424 request_id = %request_id,
1425 tx = %op_id,
1426 error = %err,
1427 "Error waiting for transaction result"
1428 )
1429 })?;
1430
1431 if let Err(err) =
1432 get::request_get(&op_manager, op, VisitedPeers::new(&op_id)).await
1433 {
1434 report_op_init_error(
1435 &op_manager,
1436 op_id,
1437 &key,
1438 "GET",
1439 &err,
1440 client_id,
1441 request_id,
1442 )
1443 .await;
1444 }
1445
1446 if subscribe {
1447 if let Some(subscription_listener) = subscription_listener {
1448 register_subscription_listener(
1449 &op_manager,
1450 key,
1451 client_id,
1452 subscription_listener,
1453 "GET",
1454 )
1455 .await?;
1456 } else {
1457 tracing::warn!(
1458 client_id = %client_id,
1459 contract = %key,
1460 "GET with subscribe=true but no subscription_listener"
1461 );
1462 }
1463 }
1464 }
1465 }
1466 ContractRequest::Subscribe { key, summary } => {
1467 let peer_id = ensure_peer_ready(&op_manager)?;
1468
1469 tracing::debug!(
1470 client_id = %client_id,
1471 request_id = %request_id,
1472 peer = %peer_id,
1473 contract = %key,
1474 phase = "request",
1475 "Received SUBSCRIBE request from client"
1476 );
1477
1478 let Some(subscriber_listener) = subscription_listener else {
1479 tracing::error!(
1480 client_id = %client_id,
1481 request_id = %request_id,
1482 contract = %key,
1483 "No subscriber listener for SUBSCRIBE request"
1484 );
1485 return Ok(None);
1486 };
1487
1488 let register_listener = op_manager
1489 .notify_contract_handler(
1490 ContractHandlerEvent::RegisterSubscriberListener {
1491 key,
1492 client_id,
1493 summary,
1494 subscriber_listener,
1495 },
1496 )
1497 .await
1498 .inspect_err(|err| {
1499 tracing::error!(
1500 client_id = %client_id,
1501 request_id = %request_id,
1502 contract = %key,
1503 error = %err,
1504 "Register subscriber listener error"
1505 );
1506 });
1507 match register_listener {
1508 Ok(ContractHandlerEvent::RegisterSubscriberListenerResponse) => {
1509 tracing::debug!(
1510 client_id = %client_id,
1511 request_id = %request_id,
1512 contract = %key,
1513 phase = "listener_registered",
1514 "Subscriber listener registered successfully"
1515 );
1516 let result =
1518 op_manager.ring.add_client_subscription(&key, client_id);
1519 if result.is_first_client {
1521 if let Some(event) =
1522 NetEventLog::seeding_started(&op_manager.ring, key)
1523 {
1524 op_manager.ring.register_events(Either::Left(event)).await;
1525 }
1526 }
1527 }
1528 _ => {
1529 tracing::error!(
1530 client_id = %client_id,
1531 request_id = %request_id,
1532 contract = %key,
1533 phase = "registration_failed",
1534 "Subscriber listener registration failed"
1535 );
1536 return Err(Error::Op(OpError::UnexpectedOpState));
1537 }
1538 }
1539
1540 if let Some(_router) = &request_router {
1547 tracing::debug!(
1548 client_id = %client_id,
1549 request_id = %request_id,
1550 peer = %peer_id,
1551 contract = %key,
1552 phase = "no_dedup",
1553 "Processing SUBSCRIBE without deduplication (instant-completion race avoidance)"
1554 );
1555
1556 let tx = crate::message::Transaction::new::<
1558 crate::operations::subscribe::SubscribeMsg,
1559 >();
1560
1561 use crate::contract::WaitingTransaction;
1563 op_manager
1564 .ch_outbound
1565 .waiting_for_transaction_result(
1566 WaitingTransaction::Transaction(tx),
1567 client_id,
1568 request_id,
1569 )
1570 .await
1571 .inspect_err(|err| {
1572 tracing::error!(
1573 "Error waiting for transaction result: {}",
1574 err
1575 );
1576 })?;
1577
1578 let _result_tx = crate::node::subscribe_with_id(
1581 op_manager.clone(),
1582 key,
1583 None, Some(tx),
1585 false,
1586 )
1587 .await
1588 .inspect_err(|err| {
1589 tracing::error!(
1590 client_id = %client_id,
1591 request_id = %request_id,
1592 tx = %tx,
1593 contract = %key,
1594 error = %err,
1595 phase = "error",
1596 "SUBSCRIBE operation failed"
1597 );
1598 })?;
1599
1600 tracing::debug!(
1601 request_id = %request_id,
1602 transaction_id = %tx,
1603 operation = "subscribe",
1604 "SUBSCRIBE operation started with dedicated transaction for this client"
1605 );
1606 } else {
1607 tracing::debug!(
1608 peer_id = %peer_id,
1609 key = %key,
1610 "Starting direct SUBSCRIBE operation",
1611 );
1612
1613 let tx = crate::message::Transaction::new::<
1615 crate::operations::subscribe::SubscribeMsg,
1616 >();
1617
1618 op_manager
1619 .ch_outbound
1620 .waiting_for_transaction_result(tx, client_id, request_id)
1621 .await
1622 .inspect_err(|err| {
1623 tracing::error!(
1624 "Error waiting for transaction result: {}",
1625 err
1626 );
1627 })?;
1628
1629 crate::node::subscribe_with_id(
1631 op_manager.clone(),
1632 key,
1633 None,
1634 Some(tx),
1635 false,
1636 )
1637 .await
1638 .inspect_err(|err| {
1639 tracing::error!(
1640 client_id = %client_id,
1641 request_id = %request_id,
1642 tx = %tx,
1643 contract = %key,
1644 error = %err,
1645 phase = "error",
1646 "SUBSCRIBE operation failed"
1647 );
1648 })?;
1649
1650 tracing::debug!(
1651 request_id = %request_id,
1652 transaction_id = %tx,
1653 operation = "subscribe",
1654 "Request-Transaction correlation"
1655 );
1656 }
1657 }
1658 _ => {
1659 tracing::error!(
1660 client_id = %client_id,
1661 request_id = %request_id,
1662 "Unsupported contract operation"
1663 );
1664 }
1665 }
1666 }
1667 ClientRequest::DelegateOp(req) => {
1668 tracing::debug!(
1669 client_id = %client_id,
1670 request_id = %request_id,
1671 phase = "request",
1672 "Received delegate operation from client"
1673 );
1674 let delegate_key = req.key().clone();
1675 let attested_contract = request.attested_contract;
1676
1677 let res = match op_manager
1678 .notify_contract_handler(ContractHandlerEvent::DelegateRequest {
1679 req,
1680 attested_contract,
1681 })
1682 .await
1683 {
1684 Ok(ContractHandlerEvent::DelegateResponse(res)) => res,
1685 Err(err) => {
1686 tracing::error!(
1687 client_id = %client_id,
1688 request_id = %request_id,
1689 delegate = %delegate_key,
1690 error = %err,
1691 phase = "error",
1692 "Delegate operation failed (contract error)"
1693 );
1694 return Err(Error::Contract(err));
1695 }
1696 Ok(_) => {
1697 tracing::error!(
1698 client_id = %client_id,
1699 request_id = %request_id,
1700 delegate = %delegate_key,
1701 phase = "error",
1702 "Delegate operation failed (unexpected state)"
1703 );
1704 return Err(Error::Op(OpError::UnexpectedOpState));
1705 }
1706 };
1707
1708 let host_response = Ok(HostResponse::DelegateResponse {
1709 key: delegate_key.clone(),
1710 values: res,
1711 });
1712
1713 if let Some(ch) = &subscription_listener {
1714 if ch.send(host_response).is_err() {
1715 tracing::error!(
1716 client_id = %client_id,
1717 request_id = %request_id,
1718 delegate = %delegate_key,
1719 "Failed to send delegate response through subscription channel"
1720 );
1721 }
1722 return Ok(None);
1723 }
1724
1725 return Ok(Some(Either::Left(QueryResult::DelegateResult {
1727 key: delegate_key,
1728 response: host_response,
1729 })));
1730 }
1731 ClientRequest::Disconnect { .. } => {
1732 tracing::debug!(
1733 client_id = %client_id,
1734 request_id = %request_id,
1735 "Received disconnect request from client, triggering subscription cleanup"
1736 );
1737
1738 let result = op_manager
1739 .ring
1740 .remove_client_from_all_subscriptions(client_id);
1741 for contract in &result.affected_contracts {
1742 op_manager.interest_manager.remove_local_client(contract);
1743 }
1744 if !result.affected_contracts.is_empty() {
1745 tracing::debug!(
1746 %client_id,
1747 subscriptions_cleaned = result.affected_contracts.len(),
1748 "Cleaned up client subscriptions and interest tracking"
1749 );
1750 }
1751
1752 for contract in &result.affected_contracts {
1754 if op_manager.ring.should_unsubscribe_upstream(contract) {
1755 let op_mgr = op_manager.clone();
1756 let contract = *contract;
1757 GlobalExecutor::spawn(async move {
1758 op_mgr.send_unsubscribe_upstream(&contract).await;
1759 });
1760 }
1761 }
1762
1763 if let Err(err) = op_manager.ch_outbound.send_to_handler_fire_and_forget(
1766 ContractHandlerEvent::ClientDisconnect { client_id },
1767 ) {
1768 tracing::warn!(
1769 %client_id,
1770 error = %err,
1771 "Failed to notify contract handler of client disconnect"
1772 );
1773 }
1774 }
1775 ClientRequest::NodeQueries(query) => {
1776 tracing::debug!(
1777 client_id = %client_id,
1778 request_id = %request_id,
1779 query = ?query,
1780 "Received node query from client"
1781 );
1782
1783 let Some(tx) = callback_tx else {
1784 tracing::error!(
1785 client_id = %client_id,
1786 request_id = %request_id,
1787 "callback_tx not available for NodeQueries"
1788 );
1789 unreachable!("callback_tx should always be Some for NodeQueries based on initialization logic");
1790 };
1791
1792 let node_event = match query {
1793 freenet_stdlib::client_api::NodeQuery::ConnectedPeers => {
1794 NodeEvent::QueryConnections { callback: tx }
1795 }
1796 freenet_stdlib::client_api::NodeQuery::SubscriptionInfo => {
1797 NodeEvent::QuerySubscriptions { callback: tx }
1798 }
1799 freenet_stdlib::client_api::NodeQuery::NodeDiagnostics { config } => {
1800 NodeEvent::QueryNodeDiagnostics {
1801 config,
1802 callback: tx,
1803 }
1804 }
1805 freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo => {
1806 tracing::warn!(
1808 client_id = %client_id,
1809 request_id = %request_id,
1810 "ProximityCacheInfo query not yet implemented"
1811 );
1812 return Ok(None);
1813 }
1814 };
1815
1816 if let Err(err) = op_manager.notify_node_event(node_event).await {
1817 tracing::error!(
1818 client_id = %client_id,
1819 request_id = %request_id,
1820 error = %err,
1821 "notify_node_event error"
1822 );
1823 return Err(Error::from(err));
1824 }
1825
1826 return Ok(Some(Either::Right(callback_rx.unwrap())));
1827 }
1828 ClientRequest::Close => {
1829 return Err(Error::Disconnected);
1830 }
1831 ClientRequest::Authenticate { .. } | _ => {
1832 tracing::error!(
1833 client_id = %client_id,
1834 request_id = %request_id,
1835 "Unsupported operation"
1836 );
1837 }
1838 }
1839 Ok(None)
1840 };
1841
1842 GlobalExecutor::spawn(fut.instrument(tracing::info_span!(
1843 parent: tracing::Span::current(),
1844 "process_client_request"
1845 )))
1846 .map(|res| match res {
1847 Ok(Ok(res)) => Ok(res),
1848 Ok(Err(err)) => Err(err),
1849 Err(err) => {
1850 tracing::error!(
1851 error = %err,
1852 "Error processing client request (task panic)"
1853 );
1854 Err(Error::from(err))
1855 }
1856 })
1857 .boxed()
1858}
1859
1860pub(crate) mod test {
1861 use std::{
1862 collections::{HashMap, HashSet},
1863 time::Duration,
1864 };
1865
1866 use freenet_stdlib::{
1867 client_api::{ContractRequest, ErrorKind},
1868 prelude::*,
1869 };
1870 use futures::{FutureExt, StreamExt};
1871 use rand::SeedableRng;
1872 use tokio::net::TcpStream;
1873 use tokio::sync::watch::Receiver;
1874 use tokio::sync::Mutex;
1875 use tokio_tungstenite::tungstenite::Message;
1876 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
1877
1878 use crate::{node::testing_impl::EventId, transport::TransportPublicKey};
1879
1880 use super::*;
1881
1882 pub struct MemoryEventsGen<R = rand::rngs::SmallRng> {
1883 key: TransportPublicKey,
1884 signal: Receiver<(EventId, TransportPublicKey)>,
1885 events_to_gen: HashMap<EventId, ClientRequest<'static>>,
1886 rng: Option<R>,
1887 internal_state: Option<InternalGeneratorState>,
1888 #[allow(dead_code)]
1891 subscription_receivers: Vec<tokio::sync::mpsc::UnboundedReceiver<HostResult>>,
1892 }
1893
1894 impl<R> MemoryEventsGen<R>
1895 where
1896 R: RandomEventGenerator,
1897 {
1898 pub fn new_with_seed(
1899 signal: Receiver<(EventId, TransportPublicKey)>,
1900 key: TransportPublicKey,
1901 seed: u64,
1902 ) -> Self {
1903 Self {
1904 signal,
1905 key,
1906 events_to_gen: HashMap::new(),
1907 rng: Some(R::seed_from_u64(seed)),
1908 internal_state: None,
1909 subscription_receivers: Vec::new(),
1910 }
1911 }
1912
1913 pub fn rng_params(
1914 &mut self,
1915 id: usize,
1916 num_peers: usize,
1917 max_contract_num: usize,
1918 iterations: usize,
1919 ) {
1920 let internal_state = InternalGeneratorState {
1921 this_peer: id,
1922 num_peers,
1923 max_contract_num,
1924 max_iterations: iterations,
1925 ..Default::default()
1926 };
1927 self.internal_state = Some(internal_state);
1928 }
1929
1930 async fn generate_rand_event(&mut self) -> Option<ClientRequest<'static>> {
1931 let (mut rng, mut state) = self
1932 .rng
1933 .take()
1934 .zip(self.internal_state.take())
1935 .expect("rng should be set");
1936
1937 let res = rng.gen_event(&mut state);
1941
1942 self.rng = Some(rng);
1943 self.internal_state = Some(state);
1944 res
1945 }
1946 }
1947
1948 impl MemoryEventsGen {
1949 #[cfg(any(test, feature = "testing"))]
1950 pub fn new(
1951 signal: Receiver<(EventId, TransportPublicKey)>,
1952 key: TransportPublicKey,
1953 ) -> Self {
1954 Self {
1955 signal,
1956 key,
1957 events_to_gen: HashMap::new(),
1958 rng: None,
1959 internal_state: None,
1960 subscription_receivers: Vec::new(),
1961 }
1962 }
1963 }
1964
1965 impl<R> MemoryEventsGen<R> {
1966 #[cfg(any(test, feature = "testing"))]
1967 pub fn generate_events(
1968 &mut self,
1969 events: impl IntoIterator<Item = (EventId, ClientRequest<'static>)>,
1970 ) {
1971 self.events_to_gen.extend(events)
1972 }
1973
1974 fn generate_deterministic_event(&mut self, id: &EventId) -> Option<ClientRequest<'static>> {
1975 self.events_to_gen.remove(id)
1976 }
1977
1978 fn build_open_request(
1981 &mut self,
1982 request: Box<ClientRequest<'static>>,
1983 ) -> OpenRequest<'static> {
1984 let notification_channel = match request.as_ref() {
1985 ClientRequest::ContractOp(ContractRequest::Subscribe { .. })
1986 | ClientRequest::ContractOp(ContractRequest::Put {
1987 subscribe: true, ..
1988 })
1989 | ClientRequest::ContractOp(ContractRequest::Get {
1990 subscribe: true, ..
1991 }) => {
1992 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1993 self.subscription_receivers.push(rx);
1994 Some(tx)
1995 }
1996 ClientRequest::DelegateOp(_)
1997 | ClientRequest::ContractOp(_)
1998 | ClientRequest::Disconnect { .. }
1999 | ClientRequest::Authenticate { .. }
2000 | ClientRequest::NodeQueries(_)
2001 | ClientRequest::Close
2002 | _ => None,
2003 };
2004 OpenRequest {
2005 client_id: ClientId::FIRST,
2006 request_id: RequestId::new(),
2007 request,
2008 notification_channel,
2009 token: None,
2010 attested_contract: None,
2011 }
2012 .into_owned()
2013 }
2014 }
2015
2016 impl<R> ClientEventsProxy for MemoryEventsGen<R>
2017 where
2018 R: RandomEventGenerator + Send,
2019 {
2020 fn recv(&mut self) -> BoxFuture<'_, Result<OpenRequest<'static>, ClientError>> {
2021 async {
2022 loop {
2023 if self.signal.changed().await.is_ok() {
2024 let (ev_id, pk) = self.signal.borrow().clone();
2025 if self.rng.is_some() && pk == self.key {
2026 let request = self
2027 .generate_rand_event()
2028 .await
2029 .ok_or_else(|| ClientError::from(ErrorKind::Disconnect))?;
2030 return Ok(self.build_open_request(request.into()));
2031 } else if pk == self.key {
2032 let request = self
2033 .generate_deterministic_event(&ev_id)
2034 .expect("event not found");
2035 return Ok(self.build_open_request(request.into()));
2036 }
2037 } else {
2038 tokio::time::sleep(Duration::from_secs(1)).await;
2040 break Err(ErrorKind::Shutdown.into());
2041 }
2042 }
2043 }
2044 .boxed()
2045 }
2046
2047 fn send(
2048 &mut self,
2049 _id: ClientId,
2050 response: Result<HostResponse, ClientError>,
2051 ) -> BoxFuture<'_, Result<(), ClientError>> {
2052 if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
2053 key, ..
2054 })) = response
2055 {
2056 self.internal_state
2057 .as_mut()
2058 .expect("state should be set")
2059 .owns_contracts
2060 .insert(key);
2061 }
2062 async { Ok(()) }.boxed()
2063 }
2064 }
2065
2066 pub struct NetworkEventGenerator<R = rand::rngs::SmallRng> {
2067 id: TransportPublicKey,
2068 memory_event_generator: MemoryEventsGen<R>,
2069 ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
2070 }
2071
2072 impl<R> NetworkEventGenerator<R>
2073 where
2074 R: RandomEventGenerator,
2075 {
2076 pub fn new(
2077 id: TransportPublicKey,
2078 memory_event_generator: MemoryEventsGen<R>,
2079 ws_client: Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
2080 ) -> Self {
2081 Self {
2082 id,
2083 memory_event_generator,
2084 ws_client,
2085 }
2086 }
2087 }
2088
2089 impl<R> ClientEventsProxy for NetworkEventGenerator<R>
2090 where
2091 R: RandomEventGenerator + Send + Clone,
2092 {
2093 fn recv(&mut self) -> BoxFuture<'_, HostIncomingMsg> {
2094 let ws_client_clone = self.ws_client.clone();
2095
2096 async move {
2097 loop {
2098 let message = {
2099 let mut lock = ws_client_clone.try_lock().inspect_err(|_| {
2100 tracing::error!(peer = %self.id, "failed to lock ws client");
2101 }).inspect(|_| {
2102 tracing::debug!(peer = %self.id, "locked ws client");
2103 }).unwrap();
2104 lock.next().await
2105 };
2106
2107 match message {
2108 Some(Ok(Message::Binary(data))) => {
2109 if let Ok((id, pub_key)) =
2110 bincode::deserialize::<(EventId, TransportPublicKey)>(&data)
2111 {
2112 tracing::debug!(peer = %self.id, %id, "Received event from the supervisor");
2113 if pub_key == self.id {
2114 let request = self
2115 .memory_event_generator
2116 .generate_rand_event()
2117 .await
2118 .ok_or_else(|| {
2119 ClientError::from(ErrorKind::Disconnect)
2120 })?;
2121 return Ok(self
2122 .memory_event_generator
2123 .build_open_request(request.into()));
2124 }
2125 } else {
2126 continue;
2127 }
2128 }
2129 None => {
2130 return Err(ClientError::from(ErrorKind::Disconnect));
2131 }
2132 _ => continue,
2133 }
2134 }
2135 }
2136 .boxed()
2137 }
2138
2139 fn send(
2140 &mut self,
2141 _id: ClientId,
2142 response: Result<HostResponse, ClientError>,
2143 ) -> BoxFuture<'_, Result<(), ClientError>> {
2144 if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
2145 key, ..
2146 })) = response
2147 {
2148 self.memory_event_generator
2149 .internal_state
2150 .as_mut()
2151 .expect("state should be set")
2152 .owns_contracts
2153 .insert(key);
2154 }
2155 async { Ok(()) }.boxed()
2156 }
2157 }
2158
2159 #[derive(Default)]
2160 pub struct InternalGeneratorState {
2161 this_peer: usize,
2162 num_peers: usize,
2163 current_iteration: usize,
2164 max_iterations: usize,
2165 max_contract_num: usize,
2166 owns_contracts: HashSet<ContractKey>,
2167 existing_contracts: Vec<ContractContainer>,
2168 }
2169
2170 pub trait RandomEventGenerator: Send + 'static {
2171 fn seed_from_u64(seed: u64) -> Self;
2172
2173 fn gen_u8(&mut self) -> u8;
2174
2175 fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize;
2176
2177 fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T>;
2178
2179 fn choose_random_from_iter<'a, T>(
2180 &mut self,
2181 mut iter: impl ExactSizeIterator<Item = &'a T> + 'a,
2182 ) -> Option<&'a T> {
2183 let len = iter.len();
2184 let idx = self.gen_range(0..len);
2185 iter.nth(idx)
2186 }
2187
2188 fn gen_event(
2197 &mut self,
2198 state: &mut InternalGeneratorState,
2199 ) -> Option<ClientRequest<'static>> {
2200 while state.current_iteration < state.max_iterations {
2201 state.current_iteration += 1;
2202 let for_this_peer = self.gen_range(0..state.num_peers) == state.this_peer;
2203 match self.gen_range(0..100) {
2204 val if (0..5).contains(&val) => {
2205 if state.max_contract_num <= state.existing_contracts.len() {
2206 continue;
2207 }
2208 let contract = self.gen_contract_container();
2209 let request = ContractRequest::Put {
2210 contract: contract.clone(),
2211 state: WrappedState::new(self.random_byte_vec()),
2212 related_contracts: RelatedContracts::new(),
2213 subscribe: true,
2214 blocking_subscribe: false,
2215 };
2216 state.existing_contracts.push(contract);
2217 if !for_this_peer {
2218 continue;
2219 }
2220 return Some(request.into());
2221 }
2222 val if (5..20).contains(&val) => {
2223 if let Some(contract) = self.choose(&state.existing_contracts) {
2224 if !for_this_peer {
2225 continue;
2226 }
2227
2228 let request = ContractRequest::Put {
2229 contract: contract.clone(),
2230 state: WrappedState::new(self.random_byte_vec()),
2231 related_contracts: RelatedContracts::new(),
2232 subscribe: true,
2235 blocking_subscribe: false,
2236 };
2237
2238 tracing::debug!(
2239 "sending put to an existing contract with subscribe=true"
2240 );
2241
2242 return Some(request.into());
2243 }
2244 }
2245 val if (20..35).contains(&val) => {
2246 if let Some(contract) = self.choose(&state.existing_contracts) {
2247 if !for_this_peer {
2248 continue;
2249 }
2250 let key = contract.key();
2251 let request = ContractRequest::Get {
2252 key: *key.id(),
2253 return_contract_code: true,
2254 subscribe: false,
2255 blocking_subscribe: false,
2256 };
2257 return Some(request.into());
2258 }
2259 }
2260 val if (35..80).contains(&val) => {
2261 let new_state = UpdateData::State(State::from(self.random_byte_vec()));
2262 if let Some(contract) = self.choose(&state.existing_contracts) {
2263 if !for_this_peer {
2266 continue;
2267 }
2268 let request = ContractRequest::Update {
2269 key: contract.key(),
2270 data: new_state,
2271 };
2272 if state.owns_contracts.contains(&contract.key()) {
2273 return Some(request.into());
2274 }
2275 }
2276 }
2277 val if (80..100).contains(&val) => {
2278 let summary = StateSummary::from(self.random_byte_vec());
2279
2280 let Some(from_existing) = self.choose(state.existing_contracts.as_slice())
2281 else {
2282 continue;
2283 };
2284
2285 let key = from_existing.key();
2286 if !for_this_peer {
2287 continue;
2288 }
2289 let request = ContractRequest::Subscribe {
2290 key: *key.id(),
2291 summary: Some(summary),
2292 };
2293 return Some(request.into());
2294 }
2295 _ => unreachable!(
2296 "gen_range(0..100) should always fall into one of the defined ranges"
2297 ),
2298 }
2299 }
2300 None
2301 }
2302
2303 fn gen_contract_container(&mut self) -> ContractContainer {
2304 let code = ContractCode::from(self.random_byte_vec());
2305 let params = Parameters::from(self.random_byte_vec());
2306 ContractWasmAPIVersion::V1(WrappedContract::new(code.into(), params)).into()
2307 }
2308
2309 fn random_byte_vec(&mut self) -> Vec<u8> {
2310 (0..self.gen_u8())
2311 .map(|_| self.gen_u8())
2312 .collect::<Vec<_>>()
2313 }
2314 }
2315
2316 use rand::prelude::IndexedRandom;
2317
2318 impl RandomEventGenerator for rand::rngs::SmallRng {
2319 fn gen_u8(&mut self) -> u8 {
2320 <Self as rand::Rng>::random(self)
2321 }
2322
2323 fn gen_range(&mut self, range: std::ops::Range<usize>) -> usize {
2324 <Self as rand::Rng>::random_range(self, range)
2325 }
2326
2327 fn choose<'a, T>(&mut self, vec: &'a [T]) -> Option<&'a T> {
2328 vec.choose(self)
2329 }
2330
2331 fn seed_from_u64(seed: u64) -> Self {
2332 <Self as SeedableRng>::seed_from_u64(seed)
2333 }
2334 }
2335
2336 #[test]
2337 fn test_gen_event() {
2338 const NUM_PEERS: usize = 20;
2339 const ITERATIONS: usize = 10_000;
2340 let mut threads = vec![];
2341 for this_peer in 0..NUM_PEERS {
2342 let thread = std::thread::spawn(move || {
2343 let mut rng = <rand::rngs::SmallRng as RandomEventGenerator>::seed_from_u64(15_978);
2344 let mut state = InternalGeneratorState {
2345 this_peer,
2346 num_peers: NUM_PEERS,
2347 max_contract_num: 10,
2348 ..Default::default()
2349 };
2350 for _ in 0..ITERATIONS {
2351 rng.gen_event(&mut state);
2352 }
2353 state
2354 });
2355 threads.push(thread);
2356 }
2357 let states = threads
2358 .into_iter()
2359 .map(|t| t.join().unwrap())
2360 .collect::<Vec<_>>();
2361
2362 let first_state = &states[0];
2363 for state in &states[1..] {
2364 assert_eq!(first_state.existing_contracts, state.existing_contracts);
2365 }
2366 }
2367}