veilid-core 0.5.3

Core library used to create a Veilid node and operate it as part of an application
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
mod protocol;

use super::*;

use crate::routing_table::*;
pub use protocol::*;
use std::io;

impl_veilid_log_facility!("net");

/////////////////////////////////////////////////////////////////

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct ProtocolConfig {
    pub outbound: ProtocolTypeSet,
    pub inbound: ProtocolTypeSet,
    pub family_global: AddressTypeSet,
    pub public_internet_capabilities: Vec<VeilidCapability>,
}

struct NetworkInner {
    network_needs_restart: bool,
    protocol_config: ProtocolConfig,
}

pub(super) struct NetworkUnlockedInner {
    // Startup lock
    startup_lock: StartupLock,
}

#[derive(Clone)]
pub(super) struct Network {
    registry: VeilidComponentRegistry,
    inner: Arc<Mutex<NetworkInner>>,
    unlocked_inner: Arc<NetworkUnlockedInner>,
}

impl_veilid_component_accessors!(Network);

impl core::ops::Deref for Network {
    type Target = NetworkUnlockedInner;

    fn deref(&self) -> &Self::Target {
        &self.unlocked_inner
    }
}

impl Network {
    fn new_inner() -> NetworkInner {
        NetworkInner {
            network_needs_restart: false,
            protocol_config: Default::default(),
        }
    }

    fn new_unlocked_inner() -> NetworkUnlockedInner {
        NetworkUnlockedInner {
            startup_lock: StartupLock::new(),
        }
    }

    pub fn new(registry: VeilidComponentRegistry) -> Self {
        Self {
            registry,
            inner: Arc::new(Mutex::new(Self::new_inner())),
            unlocked_inner: Arc::new(Self::new_unlocked_inner()),
        }
    }

    /////////////////////////////////////////////////////////////////

    // Record DialInfo failures
    async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
        &self,
        dial_info: DialInfo,
        fut: F,
    ) -> EyreResult<NetworkResult<T>> {
        let network_result = fut.await?;
        if matches!(network_result, NetworkResult::NoConnection(_)) {
            self.network_manager()
                .address_filter()
                .set_dial_info_failed(dial_info);
        }
        Ok(network_result)
    }

    // Send data to a dial info, unbound, using a new connection from a random port
    // This creates a short-lived connection in the case of connection-oriented protocols
    // for the purpose of sending this one message.
    // This bypasses the connection table as it is not a 'node to node' connection.
    #[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
    pub async fn send_data_unbound_to_dial_info(
        &self,
        dial_info: DialInfo,
        data: Bytes,
    ) -> EyreResult<NetworkResult<()>> {
        let _guard = self.unlocked_inner.startup_lock.enter()?;

        self.record_dial_info_failure(dial_info.clone(), async move {
            let data_len = data.len();
            let timeout_ms = self.config().network.connection_initial_timeout_ms;

            if self
                .network_manager()
                .address_filter()
                .is_ip_addr_punished(dial_info.address().ip_addr())
            {
                return Ok(NetworkResult::no_connection_other("punished"));
            }

            match dial_info.protocol_type() {
                ProtocolType::UDP => {
                    bail!("no support for UDP protocol")
                }
                ProtocolType::TCP => {
                    bail!("no support for TCP protocol")
                }
                ProtocolType::WS => {
                    let pnc = network_result_try!(ws::WebsocketProtocolHandler::connect(
                        self.registry(),
                        &dial_info,
                        timeout_ms
                    )
                    .await
                    .wrap_err("connect failure")?);
                    network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
                }
                #[cfg(feature = "enable-protocol-wss")]
                ProtocolType::WSS => {
                    let pnc = network_result_try!(ws::WebsocketProtocolHandler::connect(
                        self.registry(),
                        &dial_info,
                        timeout_ms
                    )
                    .await
                    .wrap_err("connect failure")?);
                    network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
                }
            };

            // Network accounting
            self.network_manager()
                .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));

            Ok(NetworkResult::Value(()))
        })
        .await
    }

    // Send data to a dial info, unbound, using a new connection from a random port
    // Waits for a specified amount of time to receive a single response
    // This creates a short-lived connection in the case of connection-oriented protocols
    // for the purpose of sending this one message.
    // This bypasses the connection table as it is not a 'node to node' connection.
    #[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
    pub async fn send_recv_data_unbound_to_dial_info(
        &self,
        dial_info: DialInfo,
        data: Bytes,
        timeout_ms: u32,
    ) -> EyreResult<NetworkResult<Bytes>> {
        let _guard = self.startup_lock.enter()?;

        self.record_dial_info_failure(dial_info.clone(), async move {
            let data_len = data.len();
            let connect_timeout_ms = self.config().network.connection_initial_timeout_ms;

            if self
                .network_manager()
                .address_filter()
                .is_ip_addr_punished(dial_info.address().ip_addr())
            {
                return Ok(NetworkResult::no_connection_other("punished"));
            }

            match dial_info.protocol_type() {
                ProtocolType::UDP => {
                    bail!("no support for UDP protocol")
                }
                ProtocolType::TCP => {
                    bail!("no support for TCP protocol")
                }
                _ => {
                    let pnc = network_result_try!(match dial_info.protocol_type() {
                        ProtocolType::UDP => unreachable!(),
                        ProtocolType::TCP => unreachable!(),
                        ProtocolType::WS => {
                            ws::WebsocketProtocolHandler::connect(
                                self.registry(),
                                &dial_info,
                                connect_timeout_ms,
                            )
                            .await
                            .wrap_err("connect failure")?
                        }
                        #[cfg(feature = "enable-protocol-wss")]
                        ProtocolType::WSS => {
                            ws::WebsocketProtocolHandler::connect(
                                self.registry(),
                                &dial_info,
                                connect_timeout_ms,
                            )
                            .await
                            .wrap_err("connect failure")?
                        }
                    });

                    network_result_try!(pnc.send(data).await.wrap_err("send failure")?);
                    self.network_manager()
                        .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));

                    let out =
                        network_result_try!(network_result_try!(timeout(timeout_ms, pnc.recv())
                            .await
                            .into_network_result())
                        .wrap_err("recv failure")?);

                    self.network_manager()
                        .stats_packet_rcvd(dial_info.ip_addr(), ByteCount::new(out.len() as u64));

                    Ok(NetworkResult::Value(out))
                }
            }
        })
        .await
    }

    #[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
    pub async fn send_data_to_existing_flow(
        &self,
        flow: Flow,
        data: Bytes,
    ) -> EyreResult<SendDataToExistingFlowResult> {
        let _guard = self.startup_lock.enter()?;

        let data_len = data.len();
        match flow.protocol_type() {
            ProtocolType::UDP => {
                bail!("no support for UDP protocol")
            }
            ProtocolType::TCP => {
                bail!("no support for TCP protocol")
            }
            _ => {}
        }

        // Handle connection-oriented protocols

        // Try to send to the exact existing connection if one exists
        if let Some(conn) = self
            .network_manager()
            .connection_manager()
            .get_connection(flow)
        {
            // connection exists, send over it
            match conn.send_async(data).await {
                ConnectionHandleSendResult::Sent => {
                    // Network accounting
                    self.network_manager().stats_packet_sent(
                        flow.remote().socket_addr().ip(),
                        ByteCount::new(data_len as u64),
                    );

                    // Data was consumed
                    return Ok(SendDataToExistingFlowResult::Sent(conn.unique_flow()));
                }
                ConnectionHandleSendResult::NotSent(data) => {
                    // Couldn't send
                    // Pass the data back out so we don't own it any more
                    return Ok(SendDataToExistingFlowResult::NotSent(data));
                }
            }
        }
        // Connection didn't exist
        // Pass the data back out so we don't own it any more
        Ok(SendDataToExistingFlowResult::NotSent(data))
    }

    // Send data directly to a dial info, possibly without knowing which node it is going to
    // Returns a flow for the connection used to send the data
    #[cfg_attr(feature = "instrument", instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len())))]
    pub async fn send_data_to_dial_info(
        &self,
        dial_info: DialInfo,
        data: Bytes,
    ) -> EyreResult<NetworkResult<UniqueFlow>> {
        let _guard = self.startup_lock.enter()?;

        self.record_dial_info_failure(dial_info.clone(), async move {
            let data_len = data.len();
            if dial_info.protocol_type() == ProtocolType::UDP {
                bail!("no support for UDP protocol");
            }
            if dial_info.protocol_type() == ProtocolType::TCP {
                bail!("no support for TCP protocol");
            }

            // Handle connection-oriented protocols
            let conn = network_result_try!(
                self.network_manager()
                    .connection_manager()
                    .get_or_create_connection(dial_info.clone())
                    .await?
            );

            if let ConnectionHandleSendResult::NotSent(_) = conn.send_async(data).await {
                return Ok(NetworkResult::NoConnection(io::Error::new(
                    io::ErrorKind::ConnectionReset,
                    "failed to send",
                )));
            }
            let unique_flow = conn.unique_flow();

            // Network accounting
            self.network_manager()
                .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));

            Ok(NetworkResult::value(unique_flow))
        })
        .await
    }

    // Send hole punch attempt to a specific dialinfo. May not be appropriate for all protocols.
    // Returns a flow for the connection used to send the data
    #[cfg_attr(
        feature = "instrument",
        instrument(level = "trace", target = "net", err, skip(self), fields(__VEILID_LOG_KEY = self.log_key()))
    )]
    #[allow(clippy::unused_async)]
    pub async fn send_hole_punch(
        &self,
        _dial_info: DialInfo,
    ) -> EyreResult<NetworkResult<UniqueFlow>> {
        Ok(NetworkResult::ServiceUnavailable(
            "unimplemented for this platform".to_owned(),
        ))
    }

    /////////////////////////////////////////////////////////////////

    pub async fn startup_internal(&self) -> EyreResult<StartupDisposition> {
        veilid_log!(self debug "starting network");
        // get protocol config
        let protocol_config = {
            let config = self.config();
            let inbound = ProtocolTypeSet::new();
            let mut outbound = ProtocolTypeSet::new();

            if config.network.protocol.ws.connect {
                outbound.insert(ProtocolType::WS);
            }
            #[cfg(feature = "enable-protocol-wss")]
            if config.network.protocol.wss.connect {
                outbound.insert(ProtocolType::WSS);
            }

            let supported_address_types: AddressTypeSet = if is_ipv6_supported() {
                AddressType::IPV4 | AddressType::IPV6
            } else {
                AddressType::IPV4.into()
            };

            let family_global = supported_address_types;

            let public_internet_capabilities = {
                PUBLIC_INTERNET_CAPABILITIES
                    .iter()
                    .copied()
                    .filter(|cap| !config.capabilities.disable.contains(cap))
                    .collect::<Vec<VeilidCapability>>()
            };

            ProtocolConfig {
                outbound,
                inbound,
                family_global,
                public_internet_capabilities,
            }
        };
        self.inner.lock().protocol_config = protocol_config.clone();

        // Start editing routing table
        let routing_table = self.routing_table();
        let mut editor_public_internet = routing_table.edit_public_internet_routing_domain();

        // set up the routing table's network config
        editor_public_internet.setup_network(
            protocol_config.outbound,
            protocol_config.inbound,
            protocol_config.family_global,
            protocol_config.public_internet_capabilities.clone(),
            true,
        );

        // commit routing domain edits
        if editor_public_internet.commit(true).await {
            editor_public_internet.publish();
        }

        Ok(StartupDisposition::Success)
    }

    #[cfg_attr(feature = "instrument", instrument(level = "debug", err, skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
    pub async fn startup(&self) -> EyreResult<StartupDisposition> {
        let guard = self.startup_lock.startup()?;

        match self.startup_internal().await {
            Ok(StartupDisposition::Success) => {
                veilid_log!(self debug "Network started");
                guard.success();
                Ok(StartupDisposition::Success)
            }
            Ok(StartupDisposition::BindRetry) => {
                debug!("network bind retry");
                Ok(StartupDisposition::BindRetry)
            }
            Err(e) => {
                debug!("network failed to start");
                Err(e)
            }
        }
    }

    pub fn needs_restart(&self) -> bool {
        self.inner.lock().network_needs_restart
    }

    pub fn is_started(&self) -> bool {
        self.startup_lock.is_started()
    }

    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
    pub fn restart_network(&self) {
        self.inner.lock().network_needs_restart = true;
    }

    #[cfg_attr(feature = "instrument", instrument(level = "debug", skip_all, fields(__VEILID_LOG_KEY = self.log_key())))]
    pub async fn shutdown(&self) {
        veilid_log!(self debug "starting low level network shutdown");
        let Ok(guard) = self.startup_lock.shutdown().await else {
            veilid_log!(self error "low level network is already shut down");
            return;
        };

        // Reset state
        let routing_table = self.routing_table();
        routing_table
            .edit_public_internet_routing_domain()
            .reset()
            .await;

        // Cancels all async background tasks by dropping join handles
        *self.inner.lock() = Self::new_inner();

        guard.success();
        veilid_log!(self debug "finished low level network shutdown");
    }

    pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
        None
    }

    pub fn get_preferred_local_address_by_key(
        &self,
        _pt: ProtocolType,
        _at: AddressType,
    ) -> Option<SocketAddr> {
        None
    }

    //////////////////////////////////////////

    #[expect(dead_code)]
    pub fn needs_update_dial_info(&self) -> bool {
        let Ok(_guard) = self.startup_lock.enter() else {
            veilid_log!(self debug "ignoring 'needs_update_dial_info' due to not started up");
            return false;
        };

        false
    }

    pub fn resolved_detect_address_changes(&self) -> bool {
        let Ok(_guard) = self.startup_lock.enter() else {
            veilid_log!(self debug "ignoring 'resolved_detect_address_changes' due to not started up");
            return false;
        };

        false
    }

    pub fn trigger_update_dial_info(&self, _routing_domain: RoutingDomain) {
        let Ok(_guard) = self.startup_lock.enter() else {
            veilid_log!(self debug "ignoring 'trigger_update_dial_info' due to not started up");
            return;
        };
    }
    //////////////////////////////////////////
    #[cfg_attr(
        feature = "instrument",
        instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err, fields(__VEILID_LOG_KEY = self.log_key()))
    )]
    #[allow(clippy::unused_async)]
    pub async fn tick(&self) -> EyreResult<()> {
        let Ok(_guard) = self.startup_lock.enter() else {
            veilid_log!(self debug "ignoring 'Network::tick' due to not started up");
            return Ok(());
        };

        Ok(())
    }
    #[expect(clippy::unused_async)]
    pub async fn cancel_tasks(&self) {}
}