bpfman-api 0.6.0

gRPC bindings to the bpfman API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of bpfman

use std::{
    collections::HashMap,
    fs::{create_dir_all, remove_dir_all, remove_file},
    os::unix::fs::chown,
    path::Path,
    sync::Arc,
};

use anyhow::{Context, Result};
use async_trait::async_trait;
use aya::maps::MapData;
use bpfman::{
    types::ListFilter,
    utils::{SOCK_MODE, create_bpffs, set_dir_permissions, set_file_permissions},
};
use bpfman_csi::v1::{
    GetPluginCapabilitiesRequest, GetPluginCapabilitiesResponse, GetPluginInfoRequest,
    GetPluginInfoResponse, NodeExpandVolumeRequest, NodeExpandVolumeResponse,
    NodeGetCapabilitiesRequest, NodeGetCapabilitiesResponse, NodeGetInfoRequest,
    NodeGetInfoResponse, NodeGetVolumeStatsRequest, NodeGetVolumeStatsResponse,
    NodePublishVolumeRequest, NodePublishVolumeResponse, NodeServiceCapability,
    NodeStageVolumeRequest, NodeStageVolumeResponse, NodeUnpublishVolumeRequest,
    NodeUnpublishVolumeResponse, NodeUnstageVolumeRequest, NodeUnstageVolumeResponse, ProbeRequest,
    ProbeResponse,
    identity_server::{Identity, IdentityServer},
    node_server::{Node, NodeServer},
    node_service_capability, volume_capability,
};
use log::{debug, error, info, warn};
use nix::mount::{MsFlags, mount, umount};
use tokio::{
    net::UnixListener,
    sync::{Mutex, broadcast},
};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{Request, Response, Status, transport::Server};

use crate::AsyncBpfman;

const DRIVER_NAME: &str = "csi.bpfman.io";
const MAPS_KEY: &str = "csi.bpfman.io/maps";
const PROGRAM_KEY: &str = "csi.bpfman.io/program";
const OPERATOR_PROGRAM_KEY: &str = "bpfman.io/ProgramName";
const RTPATH_BPFMAN_CSI_SOCKET: &str = "/run/bpfman/csi/csi.sock";
const RTDIR_BPFMAN_CSI_FS: &str = "/run/bpfman/csi/fs";
// Node Publish Volume Error code constant mirrored from: https://github.com/container-storage-interface/spec/blob/master/spec.md#nodepublishvolume-errors
const NPV_NOT_FOUND: i32 = 5;
const OWNER_READ_WRITE: u32 = 0o0750;

pub struct StorageManager {
    csi_identity: CsiIdentity,
    csi_node: CsiNode,
}

struct CsiIdentity {
    name: String,
    version: String,
}

struct CsiNode {
    node_id: String,
    db_lock: Arc<Mutex<AsyncBpfman>>,
}

#[async_trait]
impl Identity for CsiIdentity {
    async fn get_plugin_info(
        &self,
        _request: Request<GetPluginInfoRequest>,
    ) -> Result<Response<GetPluginInfoResponse>, Status> {
        return Ok(Response::new(GetPluginInfoResponse {
            name: self.name.clone(),
            vendor_version: self.version.clone(),
            manifest: HashMap::new(),
        }));
    }

    async fn probe(
        &self,
        _request: Request<ProbeRequest>,
    ) -> Result<Response<ProbeResponse>, Status> {
        return Ok(Response::new(ProbeResponse {
            ..Default::default()
        }));
    }

    // Actual caps are defined in the CSIDriver K8s resource.
    async fn get_plugin_capabilities(
        &self,
        _request: Request<GetPluginCapabilitiesRequest>,
    ) -> Result<Response<GetPluginCapabilitiesResponse>, Status> {
        return Ok(Response::new(GetPluginCapabilitiesResponse {
            ..Default::default()
        }));
    }
}

#[async_trait]
impl Node for CsiNode {
    async fn node_stage_volume(
        &self,
        _request: Request<NodeStageVolumeRequest>,
    ) -> std::result::Result<Response<NodeStageVolumeResponse>, tonic::Status> {
        return Err(Status::unimplemented(""));
    }
    async fn node_unstage_volume(
        &self,
        _request: Request<NodeUnstageVolumeRequest>,
    ) -> std::result::Result<Response<NodeUnstageVolumeResponse>, tonic::Status> {
        return Err(Status::unimplemented(""));
    }
    async fn node_publish_volume(
        &self,
        request: Request<NodePublishVolumeRequest>,
    ) -> std::result::Result<Response<NodePublishVolumeResponse>, tonic::Status> {
        let req = request.get_ref();
        let volume_cap = &req.volume_capability;
        let fs_group = volume_cap.as_ref().and_then(|volume_capability| {
            volume_capability
                .access_type
                .as_ref()
                .and_then(|at| match at {
                    volume_capability::AccessType::Mount(v) => Some(&v.volume_mount_group),
                    _ => None,
                })
        });

        let volume_id = &req.volume_id;
        let target_path = &req.target_path;
        let volume_context = &req.volume_context;
        // TODO (astoycos) support readonly bpf pins.
        let read_only = &req.readonly;

        debug!(
            "Received publish volume request with :\n\
                volume_caps: {volume_cap:#?},\n\
                volume_id: {volume_id},\n\
                target_path: {target_path},\n\
                volume_context: {volume_context:#?},\n\
                read_only: {read_only}, \n
                fs_group: {fs_group:?}"
        );

        let bpfman_lock = self.db_lock.lock().await;
        match (
            volume_context.get(MAPS_KEY),
            volume_context.get(PROGRAM_KEY),
        ) {
            (Some(m), Some(program_name)) => {
                let maps: Vec<&str> = m.split(',').collect();

                // Find the Program with the specified *Program CRD name
                let prog_data = bpfman_lock
                    .list_programs(ListFilter::default())
                    .await
                    .map_err(|e| Status::aborted(format!("failed list programs: {e}")))?
                    .into_iter()
                    .find(|p| {
                        p.get_data()
                            .get_metadata()
                            .expect("unable to get program metadata")
                            .get(OPERATOR_PROGRAM_KEY)
                            == Some(program_name)
                    })
                    .ok_or(Status::new(
                        NPV_NOT_FOUND.into(),
                        format!("Bpfman Program {program_name} not found"),
                    ))?;

                let core_map_path = prog_data
                    .get_data()
                    .get_map_pin_path()
                    .map_err(|e| Status::aborted(format!("failed to get map_pin_path: {e}")))?
                    .expect("map pin path should be set since the program is already loaded")
                    .to_owned();

                // Create the Target Path if it doesn't exist
                let target = Path::new(target_path);
                if !target.exists() {
                    create_dir_all(target).map_err(|e| {
                        Status::new(
                            NPV_NOT_FOUND.into(),
                            format!("failed creating target path {target_path:?}: {e}"),
                        )
                    })?;
                    set_dir_permissions(target_path, OWNER_READ_WRITE);
                }

                // Make a new bpf fs specifically for the pod.
                let path = &Path::new(RTDIR_BPFMAN_CSI_FS).join(volume_id);

                // Volume_id is unique to the instance of the pod, if it get's restarted it will
                // be new.
                create_dir_all(path)?;

                create_bpffs(
                    path.as_os_str()
                        .to_str()
                        .expect("unable to convert path to str"),
                )
                .map_err(|e| {
                    Status::new(
                        NPV_NOT_FOUND.into(),
                        format!("failed creating bpf-fs for pod {volume_id}: {e}"),
                    )
                })?;

                // Allow unprivileged container to access the bpffs
                if let Some(fs_group) = fs_group {
                    debug!("Setting GID of bpffs {} to {fs_group}", path.display());
                    chown(path, None, fs_group.parse().ok())?;
                };

                // Load the desired maps from the fs and re-pin to new fs.
                maps.iter().try_for_each(|m| {
                    debug!("Loading map {m} from {core_map_path:?}");
                    let map = MapData::from_pin(core_map_path.join(m)).map_err(|e| {
                        Status::new(
                            NPV_NOT_FOUND.into(),
                            format!("map {m} not found in {program_name}'s pinned maps: {e}"),
                        )
                    })?;
                    debug!("Re-pinning map {m} to {path:?}");
                    let map_path = path.join(m);
                    map.pin(&map_path).map_err(|e| {
                        Status::new(
                            NPV_NOT_FOUND.into(),
                            format!(
                                "failed re-pinning map {m} for {program_name}'s bpf-fs: {e:#?}"
                            ),
                        )
                    })?;

                    // Ensure unprivileged container access to bpffs pins
                    if let Some(fs_group) = fs_group {
                        debug!(
                            "Setting GID and permissions of map {} to {fs_group} and 0660",
                            map_path.display()
                        );
                        chown(&map_path, None, fs_group.parse().ok())?;
                        set_file_permissions(&map_path, 0o0660)
                    };
                    Ok::<(), Status>(())
                })?;

                // mount the bpffs into the container
                mount_fs_in_container(path.to_str().unwrap(), target_path).map_err(|e| {
                    Status::new(
                        NPV_NOT_FOUND.into(),
                        format!("failed mounting bpffs {path:?} to container {target_path}: {e}"),
                    )
                })?;

                Ok(Response::new(NodePublishVolumeResponse {}))
            }
            (_, Some(program)) => {
                let err_msg = format!("No {MAPS_KEY} set in volume context from {program}");
                warn!("{}", err_msg);

                Err(Status::new(NPV_NOT_FOUND.into(), err_msg))
            }
            (Some(m), _) => {
                let err_msg =
                    format!("No {PROGRAM_KEY} set in volume context in for requested maps {m}");
                warn!("{}", err_msg);

                Err(Status::new(NPV_NOT_FOUND.into(), err_msg))
            }
            (_, _) => {
                let err_msg = format!("No {MAPS_KEY} or {PROGRAM_KEY} set in volume context");
                warn!("{}", err_msg);

                Err(Status::new(NPV_NOT_FOUND.into(), err_msg))
            }
        }
    }
    async fn node_unpublish_volume(
        &self,
        request: Request<NodeUnpublishVolumeRequest>,
    ) -> std::result::Result<Response<NodeUnpublishVolumeResponse>, tonic::Status> {
        let req = request.get_ref();
        let volume_id = &req.volume_id;
        let target_path = &req.target_path;
        debug!(
            "Received unpublish volume request with :\n\
                volume_id: {volume_id},\n\
                target_path: {target_path}"
        );

        // unmount bpffs from the container
        unmount(target_path).map_err(|e| {
            Status::new(
                5.into(),
                format!("Failed to unmount bpffs from container at {target_path}: {e}"),
            )
        })?;

        // unmount the bpffs
        let path = &Path::new(RTDIR_BPFMAN_CSI_FS).join(volume_id);
        unmount(path.to_str().unwrap()).map_err(|e| {
            Status::new(
                5.into(),
                format!("Failed to unmount bpffs at {path:?}: {e}"),
            )
        })?;

        let path = &Path::new(path);
        if path.exists() {
            remove_dir_all(path).map_err(|e| {
                Status::new(5.into(), format!("Failed to remove bpffs at {path:?}: {e}"))
            })?;
        }

        Ok(Response::new(NodeUnpublishVolumeResponse {}))
    }
    async fn node_get_volume_stats(
        &self,
        _request: Request<NodeGetVolumeStatsRequest>,
    ) -> std::result::Result<Response<NodeGetVolumeStatsResponse>, tonic::Status> {
        return Err(Status::unimplemented(""));
    }
    async fn node_expand_volume(
        &self,
        _request: Request<NodeExpandVolumeRequest>,
    ) -> std::result::Result<Response<NodeExpandVolumeResponse>, tonic::Status> {
        return Err(Status::unimplemented(""));
    }
    async fn node_get_capabilities(
        &self,
        _request: Request<NodeGetCapabilitiesRequest>,
    ) -> std::result::Result<Response<NodeGetCapabilitiesResponse>, tonic::Status> {
        return Ok(Response::new(NodeGetCapabilitiesResponse {
            capabilities: vec![NodeServiceCapability {
                r#type: Some(node_service_capability::Type::Rpc(
                    node_service_capability::Rpc {
                        r#type: node_service_capability::rpc::Type::VolumeMountGroup.into(),
                    },
                )),
            }],
        }));
    }
    // see https://github.com/container-storage-interface/spec/blob/master/spec.md#nodegetinfo
    // for more information.
    async fn node_get_info(
        &self,
        _request: Request<NodeGetInfoRequest>,
    ) -> std::result::Result<Response<NodeGetInfoResponse>, tonic::Status> {
        return Ok(Response::new(NodeGetInfoResponse {
            node_id: self.node_id.clone(),

            max_volumes_per_node: 0,
            accessible_topology: None,
        }));
    }
}

impl StorageManager {
    pub fn new(db_lock: Arc<Mutex<AsyncBpfman>>) -> Self {
        const VERSION: &str = env!("CARGO_PKG_VERSION");
        let node_id = std::env::var("KUBE_NODE_NAME")
            .expect("cannot start bpfman csi driver if KUBE_NODE_NAME not set");

        let csi_identity = CsiIdentity {
            name: DRIVER_NAME.to_string(),
            version: VERSION.to_string(),
        };

        let csi_node = CsiNode { node_id, db_lock };

        Self {
            csi_node,
            csi_identity,
        }
    }

    pub async fn run(self, mut shutdown_channel: broadcast::Receiver<()>) {
        create_dir_all(RTDIR_BPFMAN_CSI_FS)
            .context("unable to create CSI socket directory")
            .expect("cannot create csi filesystem");

        let path: &Path = Path::new(RTPATH_BPFMAN_CSI_SOCKET);
        // Listen on Unix socket
        if path.exists() {
            // Attempt to remove the socket, since bind fails if it exists
            remove_file(path).expect("Panicked cleaning up stale csi socket");
        }

        let uds = UnixListener::bind(path)
            .unwrap_or_else(|_| panic!("failed to bind {RTPATH_BPFMAN_CSI_SOCKET}"));
        let uds_stream = UnixListenerStream::new(uds);
        set_file_permissions(Path::new(RTPATH_BPFMAN_CSI_SOCKET), SOCK_MODE);

        let node_service = NodeServer::new(self.csi_node);
        let identity_service = IdentityServer::new(self.csi_identity);
        let serve = Server::builder()
            .add_service(node_service)
            .add_service(identity_service)
            .serve_with_incoming_shutdown(uds_stream, async move {
                match shutdown_channel.recv().await {
                    Ok(()) => debug!("Unix Socket: Received shutdown signal"),
                    Err(e) => error!("Error receiving shutdown signal {:?}", e),
                };
            });

        tokio::spawn(async move {
            info!("CSI Plugin Listening on {}", path.display());
            if let Err(e) = serve.await {
                eprintln!("Error = {e:?}");
            }
            info!("Shutdown CSI Plugin Handler {}", path.display());
        });
    }

    #[allow(dead_code)] // TODO: Remove this when the storage manager is fully implemented
    fn create_bpffs(&self, path: &Path) -> anyhow::Result<()> {
        create_bpffs(
            path.as_os_str()
                .to_str()
                .expect("unable to convert path to str"),
        )
    }

    #[allow(dead_code)] // TODO: Remove this when the storage manager is fully implemented
    fn pin_map_to_bpffs(&self, source_object: &MapData, dest_bpffs: &Path) -> anyhow::Result<()> {
        source_object
            .pin(dest_bpffs)
            .map_err(|e| anyhow::anyhow!("unable to pin map to bpffs: {}", e))?;
        Ok(())
    }
}

pub(crate) fn unmount(directory: &str) -> anyhow::Result<()> {
    debug!("Unmounting fs at {directory}");
    umount(directory).with_context(|| format!("unable to unmount fs at {directory}"))
}

pub(crate) fn mount_fs_in_container(path: &str, target_path: &str) -> anyhow::Result<()> {
    debug!("Mounting {path} at {target_path}");
    let flags = MsFlags::MS_BIND;

    mount::<str, str, str, str>(Some(path), target_path, None, flags, None)
        .with_context(|| format!("unable to mount bpffs {path} in container at {target_path}"))
}