netavark 1.16.1

A container network stack
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
#![cfg_attr(not(unix), allow(unused_imports))]

use crate::dhcp_proxy::cache::{Clear, LeaseCache};
use crate::dhcp_proxy::dhcp_service::{process_client_stream, DhcpV4Service};
use crate::dhcp_proxy::ip;
use crate::dhcp_proxy::lib::g_rpc::netavark_proxy_server::{NetavarkProxy, NetavarkProxyServer};
use crate::dhcp_proxy::lib::g_rpc::{
    Empty, Lease as NetavarkLease, NetworkConfig, OperationResponse,
};
use crate::dhcp_proxy::proxy_conf::{
    get_cache_fqname, get_proxy_sock_fqname, DEFAULT_INACTIVITY_TIMEOUT, DEFAULT_TIMEOUT,
};
use crate::error::{NetavarkError, NetavarkResult};
use crate::network::core_utils;
use clap::Parser;
use log::{debug, error, warn};
use tokio::task::AbortHandle;

use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::os::unix::io::FromRawFd;
use std::os::unix::net::UnixListener as stdUnixListener;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::{env, fs};
#[cfg(unix)]
use tokio::net::UnixListener;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{mpsc, oneshot};
use tokio::time::{timeout, Duration};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{
    transport::Server, Code, Code::Internal, Code::InvalidArgument, Request, Response, Status,
};

#[derive(Debug)]
/// This is the tonic netavark proxy service that is required to impl the Netavark Proxy trait which
/// includes the gRPC methods defined in proto/proxy.proto. We can store a atomically referenced counted
/// mutex cache in the structure tuple.
///
/// The cache needs to be **safely mutable across multiple threads**. We need to share the lease cache
/// across multiple threads for 2 reasons
/// 1. Each tonic request is spawned in its own new thread.
/// 2. A new thread must be spawned in any request that uses mozim, such as get_lease. This is because
///    tonic creates its own runtime for each request and mozim trys to make its own runtime inside of
///    a runtime.
///
struct NetavarkProxyService<W: Write + Clear> {
    // cache is the lease hashmap
    cache: Arc<Mutex<LeaseCache<W>>>,
    // the timeout for the dora operation
    dora_timeout: u32,
    // channel send-side for resetting the inactivity timeout
    timeout_sender: Option<Arc<Mutex<Sender<i32>>>>,
    // All dhcp poll will be spawned on a new task, keep track of it so
    // we can remove it on teardown. The key is the container mac.
    task_map: Arc<Mutex<HashMap<String, AbortHandle>>>,
}

impl<W: Write + Clear> NetavarkProxyService<W> {
    fn reset_inactivity_timeout(&self) {
        if let Some(sender) = &self.timeout_sender {
            let sender_clone = sender.clone();
            let locked_sender = match sender_clone.lock() {
                Ok(v) => v,
                Err(e) => {
                    log::error!("{e}");
                    return;
                }
            };
            match locked_sender.try_send(1) {
                Ok(..) => {}
                Err(e) => log::error!("{e}"),
            }
        }
    }
}

// gRPC request and response methods
#[tonic::async_trait]
impl<W: Write + Clear + Send + 'static> NetavarkProxy for NetavarkProxyService<W> {
    /// gRPC connection to get a lease
    async fn setup(
        &self,
        request: Request<NetworkConfig>,
    ) -> Result<Response<NetavarkLease>, Status> {
        debug!("Request from client {:?}", request.remote_addr());
        // notify server of activity
        self.reset_inactivity_timeout();

        let cache = self.cache.clone();
        let timeout = self.dora_timeout;
        let task_map = self.task_map.clone();

        // setup client side streaming
        let network_config = request.into_inner();
        // _tx will be dropped when the request is dropped, this will trigger rx, which means the
        // client disconnected
        let (_tx, mut rx) = oneshot::channel::<()>();
        let lease = tokio::task::spawn(async move {
            // Check if the connection has been dropped before attempting to get a lease
            if rx.try_recv() == Err(TryRecvError::Closed) {
                log::debug!("Request dropped, aborting DORA");
                return Err(Status::new(Code::Aborted, "client disconnected"));
            }
            let get_lease = process_setup(network_config, timeout, cache, task_map);
            // watch the client and the lease, which ever finishes first return
            let get_lease: NetavarkLease = tokio::select! {
                _ = &mut rx => {
                    // we never send to tx, so this completing means that the other end, tx, was dropped!
                    log::debug!("Request dropped, aborting DORA");
                    return Err(Status::new(Code::Aborted, "client disconnected"))
                }
                lease = get_lease => {
                    Ok::<NetavarkLease, Status>(lease?)
                }
            }?;
            // check after lease was found that the client is still there
            if rx.try_recv() == Err(TryRecvError::Closed) {
                log::debug!("Request dropped, aborting DORA");
                return Err(Status::new(Code::Aborted, "client disconnected"));
            }

            Ok(get_lease)
        })
        .await;
        return match lease {
            Ok(Ok(lease)) => Ok(Response::new(lease)),
            Ok(Err(status)) => Err(status),
            Err(e) => Err(Status::new(Code::Unknown, e.to_string())),
        };
    }

    /// When a container is shut down this method should be called. It will clear the lease information
    /// from the caching system.
    async fn teardown(
        &self,
        request: Request<NetworkConfig>,
    ) -> Result<Response<NetavarkLease>, Status> {
        // notify server of activity
        self.reset_inactivity_timeout();
        let nc = request.into_inner();

        let cache = self.cache.clone();
        let tasks = self.task_map.clone();

        let task = tasks
            .lock()
            .expect("lock tasks")
            .remove(&nc.container_mac_addr);
        if let Some(handle) = task {
            handle.abort();
        }

        // Remove the client from the cache dir
        let lease = cache
            .lock()
            .expect("Could not unlock cache. A thread was poisoned")
            .remove_lease(&nc.container_mac_addr)
            .map_err(|e| Status::internal(e.to_string()))?;

        Ok(Response::new(lease))
    }

    /// On teardown of the proxy the cache will be cleared gracefully.
    async fn clean(&self, request: Request<Empty>) -> Result<Response<OperationResponse>, Status> {
        debug!("Request from client: {:?}", request.remote_addr());
        self.cache
            .clone()
            .lock()
            .expect("Could not unlock cache. A thread was poisoned")
            .teardown()?;
        Ok(Response::new(OperationResponse { success: true }))
    }
}

#[derive(Parser, Debug)]
#[clap(version = env ! ("CARGO_PKG_VERSION"))]
pub struct Opts {
    /// location to store backup files
    #[clap(short, long)]
    dir: Option<String>,
    /// alternative uds location
    #[clap(short, long)]
    uds: Option<String>,
    /// optional time in seconds to time out after looking for a lease
    #[clap(short, long)]
    timeout: Option<u32>,
    /// activity timeout
    #[clap(short, long)]
    activity_timeout: Option<u64>,
}

/// Handle SIGINT signal.
///
/// Will wait until process receives a SIGINT/ ctrl+c signal and then clean up and shut down
async fn handle_signal(uds_path: PathBuf) {
    tokio::spawn(async move {
        // Handle signal hooks with expect, it is important these are setup so data is not corrupted
        let mut sigterm = signal(SignalKind::terminate()).expect("Could not set up SIGTERM hook");
        let mut sigint = signal(SignalKind::interrupt()).expect("Could not set up SIGINT hook");
        // Wait for either a SIGINT or a SIGTERM to clean up
        tokio::select! {
            _ = sigterm.recv() => {
                warn!("Received SIGTERM, cleaning up and exiting");
            }
            _ = sigint.recv() => {
                warn!("Received SIGINT, cleaning up and exiting");
            }
        }
        if let Err(e) = fs::remove_file(uds_path) {
            error!("Could not close uds socket: {e}");
        }

        std::process::exit(0x0100);
    });
}

#[tokio::main]
pub async fn serve(opts: Opts) -> NetavarkResult<()> {
    let optional_run_dir = opts.dir.as_deref();
    let dora_timeout = opts.timeout.unwrap_or(DEFAULT_TIMEOUT);
    let inactivity_timeout =
        Duration::from_secs(opts.activity_timeout.unwrap_or(DEFAULT_INACTIVITY_TIMEOUT));

    let uds_path = get_proxy_sock_fqname(optional_run_dir);
    debug!("socket path: {}", &uds_path.display());

    let mut is_systemd_activated = false;

    // check if the UDS is a systemd socket activated service.  if it is,
    // then systemd hands this over to us on FD 3.
    let uds: UnixListener = match env::var("LISTEN_FDS") {
        Ok(effds) => {
            if effds != "1" {
                return Err(NetavarkError::msg("Received more than one FD from systemd"));
            }
            is_systemd_activated = true;
            let systemd_socket = unsafe { stdUnixListener::from_raw_fd(3) };
            systemd_socket.set_nonblocking(true)?;
            UnixListener::from_std(systemd_socket)?
        }
        // Use the standard socket approach
        Err(..) => {
            // Create a new uds socket path
            match Path::new(&uds_path).parent() {
                None => {
                    return Err(NetavarkError::msg("Could not get parent from uds path"));
                }
                Some(f) => tokio::fs::create_dir_all(f).await?,
            }
            // Watch for signals after the uds path has been created, so that the socket can be closed.
            handle_signal(uds_path.clone()).await;
            UnixListener::bind(&uds_path)?
        }
    };

    let uds_stream = UnixListenerStream::new(uds);

    // Create the cache file
    let fq_cache_path = get_cache_fqname(optional_run_dir);
    let file = match File::create(&fq_cache_path) {
        Ok(file) => {
            debug!("Successfully created leases file: {fq_cache_path:?}");
            file
        }
        Err(e) => {
            return Err(NetavarkError::msg(format!(
                "Exiting. Could not create lease cache file: {e}",
            )));
        }
    };

    let cache = match LeaseCache::new(file) {
        Ok(c) => Arc::new(Mutex::new(c)),
        Err(e) => {
            return Err(NetavarkError::msg(format!(
                "Could not setup the cache: {e}"
            )));
        }
    };

    // Create send and receive channels for activity timeout. If anything is
    // sent by the tx side, the inactivity timeout is reset
    let (activity_timeout_tx, activity_timeout_rx) = if inactivity_timeout.as_secs() > 0 {
        let (tx, rx) = mpsc::channel(5);
        (Some(tx), Some(rx))
    } else {
        (None, None)
    };
    let netavark_proxy_service = NetavarkProxyService {
        cache: cache.clone(),
        dora_timeout,
        timeout_sender: activity_timeout_tx
            .clone()
            .map(|tx| Arc::new(Mutex::new(tx))),
        task_map: Arc::new(Mutex::new(HashMap::new())),
    };

    let server = Server::builder()
        .add_service(NetavarkProxyServer::new(netavark_proxy_service))
        .serve_with_incoming(uds_stream);

    tokio::pin!(server);

    tokio::select! {
        //  a timeout duration of 0 means NEVER
        _ = handle_wakeup(activity_timeout_rx, inactivity_timeout, cache.clone()), if inactivity_timeout.as_secs() > 0  => {},
        _ = &mut server => {},
    };

    // Make sure to only remove the socket path when we do not run socket activated,
    // otherwise we delete the socket systemd is using which causes all new connections to fail.
    if !is_systemd_activated {
        fs::remove_file(uds_path)?;
    }
    Ok(())
}

/// manages the timeout lifecycle for the proxy server based on a defined timeout.
///
/// # Arguments
///
/// * `rx`: receive side of channel
/// * `timeout_duration`: time duration in seconds
///
/// returns: ()
///
/// # Examples
///
/// ```
///
/// ```
async fn handle_wakeup<W: Write + Clear>(
    rx: Option<mpsc::Receiver<i32>>,
    timeout_duration: Duration,
    current_cache: Arc<Mutex<LeaseCache<W>>>,
) {
    if let Some(mut rx) = rx {
        loop {
            match timeout(timeout_duration, rx.recv()).await {
                Ok(Some(_)) => {
                    debug!("timeout timer reset")
                }
                Ok(None) => {
                    println!("timeout channel closed");
                    break;
                }
                Err(_) => {
                    // only 'exit' if the timeout is met AND there are no leases
                    // if we do not exit, the activity_timeout is reset
                    if is_catch_empty(current_cache.clone()) {
                        println!(
                            "timeout met: exiting after {} secs of inactivity",
                            timeout_duration.as_secs()
                        );
                        break;
                    }
                }
            }
        }
    }
}

/// get_cache_len returns the number of leases in the hashmap in memory
///
/// # Arguments
///
/// * `current_cache`:
///
/// returns: usize
///
/// # Examples
///
/// ```
///
/// ```
fn is_catch_empty<W: Write + Clear>(current_cache: Arc<Mutex<LeaseCache<W>>>) -> bool {
    match current_cache.lock() {
        Ok(v) => {
            debug!("cache_len is {}", v.len());
            v.is_empty()
        }
        Err(e) => {
            log::error!("{e}");
            false
        }
    }
}

/// Process network config into a lease and setup the ip
///
/// # Arguments
///
/// * `network_config`: Network config
/// * `timeout`: dora timeout
/// * `cache`: lease cache
///
/// returns: Result<Lease, Status>
async fn process_setup<W: Write + Clear>(
    network_config: NetworkConfig,
    timeout: u32,
    cache: Arc<Mutex<LeaseCache<W>>>,
    tasks: Arc<Mutex<HashMap<String, AbortHandle>>>,
) -> Result<NetavarkLease, Status> {
    let container_network_interface = network_config.container_iface.clone();
    let ns_path = network_config.ns_path.clone();

    // test if mac is valid
    core_utils::CoreUtils::decode_address_from_hex(&network_config.container_mac_addr)
        .map_err(|e| Status::new(InvalidArgument, format!("{e}")))?;
    let mac = &network_config.container_mac_addr.clone();

    let nv_lease = match network_config.version {
        //V4
        0 => {
            let mut service = DhcpV4Service::new(network_config, timeout)?;

            let lease = service.get_lease().await?;
            let task = tokio::spawn(process_client_stream(service));
            tasks
                .lock()
                .expect("lock tasks")
                .insert(mac.to_string(), task.abort_handle());
            lease
        }
        //V6 TODO implement DHCPv6
        1 => {
            return Err(Status::new(InvalidArgument, "ipv6 not yet supported"));
        }
        _ => {
            return Err(Status::new(InvalidArgument, "invalid protocol version"));
        }
    };

    if let Err(e) = cache
        .lock()
        .expect("Could not unlock cache. A thread was poisoned")
        .add_lease(mac, &nv_lease)
    {
        return Err(Status::new(
            Internal,
            format!("Error caching the lease: {e}"),
        ));
    }

    ip::setup(&nv_lease, &container_network_interface, &ns_path)?;
    Ok(nv_lease)
}