freenet 0.2.48

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
//! Types and definitions to handle all inter-peer communication.
//!
//! This module provides the `NetworkBridge` trait, an abstraction layer for network interactions.
//! Implementations manage peer connections, message serialization, and routing.
//! It receives outgoing messages from the `Node` and `OpManager` event loops and forwards
//! incoming network messages (`NetMessage`) to the `Node`'s event loop via channels.
//!
//! See [`../../architecture.md`](../../architecture.md) for its interactions with event loops and other components.

use std::future::Future;
use std::net::SocketAddr;

use either::Either;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::message::{NetMessage, NodeEvent};

pub(crate) mod broadcast_queue;
mod handshake;
pub(crate) mod in_memory;
pub(crate) mod p2p_protoc;
pub(crate) mod priority_select;

// Re-export fault injection types and functions for testing
pub use in_memory::{FaultInjectorState, NetworkStats, get_fault_injector, set_fault_injector};
// Re-export event loop exit reason for graceful shutdown handling
pub use p2p_protoc::EventLoopExitReason;

pub(crate) type ConnResult<T> = std::result::Result<T, ConnectionError>;

/// Allows handling of connections to the network as well as sending messages
/// to other peers in the network with whom connection has been established.
///
/// Connections are keyed by socket address since that's what identifies
/// a network connection. The cryptographic identity is handled separately
/// at the transport layer.
pub(crate) trait NetworkBridge: Send + Sync {
    #[allow(dead_code)]
    fn drop_connection(
        &mut self,
        peer_addr: SocketAddr,
    ) -> impl Future<Output = ConnResult<()>> + Send;

    fn send(
        &self,
        target_addr: SocketAddr,
        msg: NetMessage,
    ) -> impl Future<Output = ConnResult<()>> + Send;

    /// Send raw stream data to a peer using operations-level streaming.
    ///
    /// The `stream_id` should be created via `StreamId::next_operations()` so the
    /// receiver skips legacy InboundStream decoding. The data is fragmented and
    /// sent via the transport's streaming mechanism.
    fn send_stream(
        &self,
        target_addr: SocketAddr,
        stream_id: crate::transport::peer_connection::StreamId,
        data: bytes::Bytes,
        metadata: Option<bytes::Bytes>,
    ) -> impl Future<Output = ConnResult<()>> + Send;

    /// Pipe an inbound stream to a peer, forwarding fragments as they arrive.
    ///
    /// Unlike `send_stream` which takes complete data and fragments it, this forwards
    /// fragments from an existing `StreamHandle` incrementally without waiting for
    /// full reassembly. This enables low-latency forwarding at intermediate nodes.
    ///
    /// The `outbound_stream_id` should be created via `StreamId::next_operations()`.
    fn pipe_stream(
        &self,
        target_addr: SocketAddr,
        outbound_stream_id: crate::transport::peer_connection::StreamId,
        inbound_handle: crate::transport::peer_connection::streaming::StreamHandle,
        metadata: Option<bytes::Bytes>,
    ) -> impl Future<Output = ConnResult<()>> + Send;
}

#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
pub(crate) enum ConnectionError {
    #[error("location unknown for this node")]
    LocationUnknown,
    #[error("unable to send message to {0}")]
    SendNotCompleted(SocketAddr),
    #[error("Unexpected connection req")]
    UnexpectedReq,
    #[error("error while de/serializing message")]
    #[serde(skip)]
    Serialization(#[from] Option<Box<bincode::ErrorKind>>),
    #[error("{0}")]
    TransportError(String),
    #[error("failed connect")]
    FailedConnectOp,
    #[error("unwanted connection")]
    UnwantedConnection,
    #[error("connection to/from address {0} blocked by local policy")]
    AddressBlocked(std::net::SocketAddr),

    // errors produced while handling the connection:
    #[error("IO error: {0}")]
    IOError(String),
    #[error("timeout error while waiting for a message")]
    Timeout,
}

impl From<std::io::Error> for ConnectionError {
    fn from(err: std::io::Error) -> Self {
        Self::IOError(format!("{err}"))
    }
}

impl From<crate::transport::TransportError> for ConnectionError {
    fn from(err: crate::transport::TransportError) -> Self {
        Self::TransportError(err.to_string())
    }
}

impl Clone for ConnectionError {
    fn clone(&self) -> Self {
        match self {
            Self::LocationUnknown => Self::LocationUnknown,
            Self::Serialization(_) => Self::Serialization(None),
            Self::SendNotCompleted(addr) => Self::SendNotCompleted(*addr),
            Self::IOError(err) => Self::IOError(err.clone()),
            Self::Timeout => Self::Timeout,
            Self::UnexpectedReq => Self::UnexpectedReq,
            Self::TransportError(err) => Self::TransportError(err.clone()),
            Self::FailedConnectOp => Self::FailedConnectOp,
            Self::UnwantedConnection => Self::UnwantedConnection,
            Self::AddressBlocked(addr) => Self::AddressBlocked(*addr),
        }
    }
}

use std::cell::Cell;

const CHANNEL_ID_BLOCK: u64 = 1_000_000;

thread_local! {
    static CHANNEL_ID_COUNTER: Cell<u64> = {
        let idx = crate::config::GlobalRng::thread_index();
        Cell::new(idx * CHANNEL_ID_BLOCK)
    };
}

/// Reset the channel ID counter to initial state for this thread.
/// Thread-local, so safe for parallel test execution.
pub fn reset_channel_id_counter() {
    let idx = crate::config::GlobalRng::thread_index();
    CHANNEL_ID_COUNTER.with(|c| c.set(idx * CHANNEL_ID_BLOCK));
}

/// Channel capacity for event loop notification and op execution channels.
const EVENT_LOOP_CHANNEL_CAPACITY: usize = 2048;

pub(crate) fn event_loop_notification_channel()
-> (EventLoopNotificationsReceiver, EventLoopNotificationsSender) {
    let _channel_id = CHANNEL_ID_COUNTER.with(|c| {
        let v = c.get();
        c.set(v + 1);
        v
    });
    let (notification_tx, notification_rx) = mpsc::channel(EVENT_LOOP_CHANNEL_CAPACITY);
    let (op_execution_tx, op_execution_rx) = mpsc::channel(EVENT_LOOP_CHANNEL_CAPACITY);

    tracing::info!(
        channel_id = _channel_id,
        "Created event loop notification channel pair"
    );

    (
        EventLoopNotificationsReceiver {
            notifications_receiver: notification_rx,
            op_execution_receiver: op_execution_rx,
        },
        EventLoopNotificationsSender {
            notifications_sender: notification_tx,
            op_execution_sender: op_execution_tx,
        },
    )
}

/// Payload shared between [`EventLoopNotificationsSender::op_execution_sender`]
/// and the event loop's `handle_op_execution` receiver.
///
/// `target_addr` carries the peer the driver wants the Request to reach on
/// the wire. When `Some`, `handle_op_execution` emits
/// [`ConnEvent::OutboundMessageWithTarget`] so the message is dispatched
/// directly to that peer's connection. When `None`, the message loops back
/// as a local [`ConnEvent::InboundMessage`] (the original Phase 2b behavior,
/// used by non-routed call sites and tests).
///
/// The target-aware path was added for issue #3838: when a client-initiated
/// SUBSCRIBE has the contract cached locally, the loop-back `InboundMessage`
/// was short-circuiting in `process_message` instead of propagating upstream
/// to register as a downstream subscriber on the home node.
pub(crate) type OpExecutionPayload = (Sender<NetMessage>, NetMessage, Option<SocketAddr>);

pub(crate) struct EventLoopNotificationsReceiver {
    pub(crate) notifications_receiver: Receiver<Either<NetMessage, NodeEvent>>,
    pub(crate) op_execution_receiver: Receiver<OpExecutionPayload>,
}

#[derive(Clone)]
pub(crate) struct EventLoopNotificationsSender {
    pub(crate) notifications_sender: Sender<Either<NetMessage, NodeEvent>>,
    pub(crate) op_execution_sender: Sender<OpExecutionPayload>,
}

impl EventLoopNotificationsSender {
    pub(crate) fn notifications_sender(&self) -> &Sender<Either<NetMessage, NodeEvent>> {
        &self.notifications_sender
    }

    /// Returns the number of messages currently queued in the notification channel.
    pub(crate) fn notification_channel_pending(&self) -> usize {
        self.notifications_sender
            .max_capacity()
            .saturating_sub(self.notifications_sender.capacity())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::GlobalExecutor;
    use either::Either;
    use freenet_stdlib::prelude::*;
    use tokio::time::{Duration, timeout};

    /// Test that notification channel works correctly with biased select
    /// This test simulates the event loop scenario where we use biased select
    /// to poll the notification channel along with other futures
    #[tokio::test]
    async fn test_notification_channel_with_biased_select() {
        // Create notification channel
        let (notification_channel, notification_tx) = event_loop_notification_channel();
        let mut rx = notification_channel.notifications_receiver;

        // Create a simple NodeEvent to test the channel
        let test_event = crate::message::NodeEvent::Disconnect { cause: None };

        // Spawn a task to send notification after a delay
        let sender = notification_tx.clone();
        GlobalExecutor::spawn(async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            tracing::info!("Sending notification");
            sender
                .notifications_sender()
                .send(Either::Right(test_event))
                .await
                .expect("Failed to send notification");
            tracing::info!("Notification sent successfully");
        });

        // Simulate event loop with biased select
        let (_dummy_tx, mut dummy_rx) = tokio::sync::mpsc::channel::<String>(10);
        let mut received = false;

        tracing::info!("Starting event loop simulation");
        for i in 0..50 {
            tracing::debug!("Loop iteration {}", i);

            let result = timeout(Duration::from_millis(100), async {
                tokio::select! {
                    biased;
                    msg = rx.recv() => {
                        tracing::info!("Received notification: {:?}", msg);
                        Some(msg)
                    }
                    _ = dummy_rx.recv() => {
                        tracing::debug!("Received dummy message");
                        None
                    }
                }
            })
            .await;

            match result {
                Ok(Some(Some(_msg))) => {
                    tracing::info!("Successfully received notification!");
                    received = true;
                    break;
                }
                Ok(Some(None)) => {
                    tracing::error!("Channel closed unexpectedly");
                    break;
                }
                Ok(None) => {
                    tracing::debug!("Dummy channel activity");
                }
                Err(_) => {
                    tracing::debug!("Timeout, continuing...");
                }
            }
        }

        assert!(received, "Notification was never received by event loop");
        tracing::info!("Test passed!");
    }

    /// Test that multiple notifications can be sent and received
    #[tokio::test]
    async fn test_multiple_notifications() {
        let (notification_channel, notification_tx) = event_loop_notification_channel();
        let mut rx = notification_channel.notifications_receiver;

        // Send 3 notifications
        for _i in 0..3 {
            let test_event = crate::message::NodeEvent::Disconnect { cause: None };

            notification_tx
                .notifications_sender()
                .send(Either::Right(test_event))
                .await
                .expect("Failed to send notification");
        }

        // Receive all 3
        let mut count = 0;
        while count < 3 {
            match timeout(Duration::from_secs(1), rx.recv()).await {
                Ok(Some(_)) => count += 1,
                Ok(None) => panic!("Channel closed unexpectedly"),
                Err(_) => panic!("Timeout waiting for notification {}", count + 1),
            }
        }

        assert_eq!(count, 3, "Should receive all 3 notifications");
    }

    /// Test channel behavior when receiver is dropped
    #[tokio::test]
    async fn test_send_fails_when_receiver_dropped() {
        let (notification_channel, notification_tx) = event_loop_notification_channel();

        // Drop the receiver
        drop(notification_channel);

        // Try to send - should fail
        let test_event = crate::message::NodeEvent::Disconnect { cause: None };

        let result = notification_tx
            .notifications_sender()
            .send(Either::Right(test_event))
            .await;

        assert!(result.is_err(), "Send should fail when receiver is dropped");
    }

    // Note: Tests that require NetMessage creation are omitted because
    // constructing valid NetMessage instances requires complex setup with
    // cryptographic keys, proper state management, and specific operation states.
    // The core channel functionality is already well-tested by the NodeEvent tests above.

    /// Test channel capacity doesn't block under normal load
    #[tokio::test]
    async fn test_channel_capacity() {
        let (notification_channel, notification_tx) = event_loop_notification_channel();
        let mut rx = notification_channel.notifications_receiver;

        // Send multiple messages without reading
        for _ in 0..50 {
            let test_event = crate::message::NodeEvent::Disconnect { cause: None };
            notification_tx
                .notifications_sender()
                .send(Either::Right(test_event))
                .await
                .expect("Should not block with capacity of 100");
        }

        // Now read them all
        let mut count = 0;
        while count < 50 {
            match timeout(Duration::from_millis(10), rx.recv()).await {
                Ok(Some(_)) => count += 1,
                _ => break,
            }
        }

        assert_eq!(count, 50, "Should receive all 50 messages");
    }

    /// Test EventLoopNotificationsSender clone works correctly
    #[tokio::test]
    async fn test_sender_clone() {
        let (notification_channel, notification_tx) = event_loop_notification_channel();
        let mut rx = notification_channel.notifications_receiver;

        // Clone the sender
        let cloned_tx = notification_tx.clone();

        // Send from original
        let test_event1 = crate::message::NodeEvent::Disconnect { cause: None };
        notification_tx
            .notifications_sender()
            .send(Either::Right(test_event1))
            .await
            .expect("Should send from original");

        // Send from clone
        let test_event2 = crate::message::NodeEvent::Disconnect {
            cause: Some("cloned".into()),
        };
        cloned_tx
            .notifications_sender()
            .send(Either::Right(test_event2))
            .await
            .expect("Should send from clone");

        // Both should be received
        let mut received = 0;
        for _ in 0..2 {
            if timeout(Duration::from_millis(100), rx.recv()).await.is_ok() {
                received += 1;
            }
        }
        assert_eq!(received, 2, "Should receive both messages");
    }
}

// ConnectionError tests
#[cfg(test)]
mod connection_error_tests {
    use super::*;

    #[test]
    fn test_connection_error_clone() {
        let errors = vec![
            ConnectionError::LocationUnknown,
            ConnectionError::SendNotCompleted("127.0.0.1:8080".parse().unwrap()),
            ConnectionError::UnexpectedReq,
            ConnectionError::Serialization(None),
            ConnectionError::TransportError("test error".to_string()),
            ConnectionError::FailedConnectOp,
            ConnectionError::UnwantedConnection,
            ConnectionError::AddressBlocked("127.0.0.1:8080".parse().unwrap()),
            ConnectionError::IOError("io error".to_string()),
            ConnectionError::Timeout,
        ];

        for error in errors {
            let cloned = error.clone();
            // Verify clone produces equivalent error
            assert_eq!(format!("{}", error), format!("{}", cloned));
        }
    }

    #[test]
    fn test_connection_error_from_io_error() {
        let io_error = std::io::Error::other("test io error");
        let conn_error: ConnectionError = io_error.into();

        match conn_error {
            ConnectionError::IOError(msg) => {
                assert!(msg.contains("test io error"));
            }
            ConnectionError::LocationUnknown
            | ConnectionError::SendNotCompleted(_)
            | ConnectionError::UnexpectedReq
            | ConnectionError::Serialization(_)
            | ConnectionError::TransportError(_)
            | ConnectionError::FailedConnectOp
            | ConnectionError::UnwantedConnection
            | ConnectionError::AddressBlocked(_)
            | ConnectionError::Timeout => panic!("Expected IOError variant"),
        }
    }

    #[test]
    fn test_connection_error_display() {
        let error = ConnectionError::LocationUnknown;
        let display = format!("{}", error);
        assert!(!display.is_empty());
        assert!(display.contains("location unknown"));

        let error = ConnectionError::SendNotCompleted("127.0.0.1:8080".parse().unwrap());
        let display = format!("{}", error);
        assert!(display.contains("127.0.0.1:8080"));

        let error = ConnectionError::AddressBlocked("192.168.1.1:9000".parse().unwrap());
        let display = format!("{}", error);
        assert!(display.contains("192.168.1.1:9000"));

        let error = ConnectionError::Timeout;
        let display = format!("{}", error);
        assert!(display.contains("timeout"));
    }

    #[test]
    fn test_serialization_error_clone_loses_inner() {
        // When cloning a Serialization error, the inner error is lost
        let inner = Box::new(bincode::ErrorKind::SizeLimit);
        let original = ConnectionError::Serialization(Some(inner));
        let cloned = original.clone();

        match cloned {
            ConnectionError::Serialization(None) => {} // Expected - inner is lost
            ConnectionError::LocationUnknown
            | ConnectionError::SendNotCompleted(_)
            | ConnectionError::UnexpectedReq
            | ConnectionError::Serialization(_)
            | ConnectionError::TransportError(_)
            | ConnectionError::FailedConnectOp
            | ConnectionError::UnwantedConnection
            | ConnectionError::AddressBlocked(_)
            | ConnectionError::IOError(_)
            | ConnectionError::Timeout => panic!("Expected Serialization(None) after clone"),
        }
    }

    #[test]
    fn test_connection_error_debug() {
        let error = ConnectionError::FailedConnectOp;
        let debug = format!("{:?}", error);
        assert!(!debug.is_empty());
        assert!(debug.contains("FailedConnectOp"));
    }
}