1use libc::uintptr_t;
23use serde::{Deserialize, Serialize};
24use std::collections::{HashMap, HashSet};
25use std::ffi::{CStr, CString};
26use std::fmt;
27use std::marker::PhantomData;
28use std::ptr;
29use std::ptr::NonNull;
30use std::sync::{Arc, RwLock};
31use thiserror::Error;
32
33mod bindings {
35 #![allow(non_upper_case_globals)]
36 #![allow(non_camel_case_types)]
37 #![allow(non_snake_case)]
38 #![allow(dead_code)]
39 include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
40}
41
42use bindings::{
44 nixl_capi_create_agent, nixl_capi_create_backend, nixl_capi_create_notif_map,
45 nixl_capi_create_opt_args, nixl_capi_create_reg_dlist, nixl_capi_create_xfer_dlist,
46 nixl_capi_deregister_mem, nixl_capi_destroy_agent, nixl_capi_destroy_backend,
47 nixl_capi_destroy_mem_list, nixl_capi_destroy_notif_map, nixl_capi_destroy_opt_args,
48 nixl_capi_destroy_params, nixl_capi_destroy_reg_dlist, nixl_capi_destroy_string_list,
49 nixl_capi_destroy_xfer_dlist, nixl_capi_get_available_plugins, nixl_capi_get_backend_params,
50 nixl_capi_get_local_md, nixl_capi_get_notifs, nixl_capi_get_plugin_params,
51 nixl_capi_get_xfer_status, nixl_capi_invalidate_remote_md, nixl_capi_load_remote_md,
52 nixl_capi_mem_list_get, nixl_capi_mem_list_is_empty, nixl_capi_mem_list_size,
53 nixl_capi_mem_type_t, nixl_capi_mem_type_to_string, nixl_capi_notif_map_clear,
54 nixl_capi_notif_map_get_agent_at, nixl_capi_notif_map_get_notif,
55 nixl_capi_notif_map_get_notifs_size, nixl_capi_notif_map_size, nixl_capi_opt_args_add_backend,
56 nixl_capi_opt_args_get_has_notif, nixl_capi_opt_args_get_notif_msg,
57 nixl_capi_opt_args_get_skip_desc_merge, nixl_capi_opt_args_set_has_notif,
58 nixl_capi_opt_args_set_notif_msg, nixl_capi_opt_args_set_skip_desc_merge,
59 nixl_capi_params_create_iterator, nixl_capi_params_destroy_iterator, nixl_capi_params_is_empty,
60 nixl_capi_params_iterator_next, nixl_capi_post_xfer_req, nixl_capi_reg_dlist_add_desc,
61 nixl_capi_reg_dlist_clear, nixl_capi_register_mem, nixl_capi_string_list_get,
62 nixl_capi_string_list_size, nixl_capi_xfer_dlist_add_desc, nixl_capi_xfer_dlist_clear,
63 nixl_capi_agent_make_connection,
64 nixl_capi_reg_dlist_print, nixl_capi_xfer_dlist_print, nixl_capi_gen_notif, nixl_capi_estimate_xfer_cost,
65 nixl_capi_query_mem, nixl_capi_create_query_resp_list, nixl_capi_destroy_query_resp_list,
66 nixl_capi_query_resp_list_size, nixl_capi_query_resp_list_has_value,
67 nixl_capi_query_resp_list_get_params, nixl_capi_prep_xfer_dlist, nixl_capi_release_xfer_dlist_handle,
68 nixl_capi_make_xfer_req, nixl_capi_get_local_partial_md,
69 nixl_capi_send_local_partial_md, nixl_capi_query_xfer_backend, nixl_capi_opt_args_set_ip_addr,
70 nixl_capi_opt_args_set_port, nixl_capi_get_xfer_telemetry,
71 nixl_capi_create_params, nixl_capi_params_add, nixl_capi_is_stub
72};
73
74pub use bindings::{
76 nixl_capi_status_t_NIXL_CAPI_ERROR_BACKEND as NIXL_CAPI_ERROR_BACKEND,
77 nixl_capi_status_t_NIXL_CAPI_ERROR_INVALID_PARAM as NIXL_CAPI_ERROR_INVALID_PARAM,
78 nixl_capi_status_t_NIXL_CAPI_IN_PROG as NIXL_CAPI_IN_PROG,
79 nixl_capi_status_t_NIXL_CAPI_SUCCESS as NIXL_CAPI_SUCCESS,
80 nixl_capi_status_t_NIXL_CAPI_ERROR_NO_TELEMETRY as NIXL_CAPI_ERROR_NO_TELEMETRY
81};
82
83mod agent;
84mod descriptors;
85mod notify;
86mod utils;
87mod xfer;
88
89pub use agent::*;
90pub use descriptors::*;
91pub use notify::*;
92pub use utils::*;
93pub use xfer::*;
94
95#[derive(Error, Debug)]
97pub enum NixlError {
98 #[error("Invalid parameter provided to NIXL")]
99 InvalidParam,
100 #[error("Backend error occurred")]
101 BackendError,
102 #[error("Failed to create CString from input: {0}")]
103 StringConversionError(#[from] std::ffi::NulError),
104 #[error("Index out of bounds")]
105 IndexOutOfBounds,
106 #[error("Invalid data pointer")]
107 InvalidDataPointer,
108 #[error("Failed to create XferRequest")]
109 FailedToCreateXferRequest,
110 #[error("Failed to create registration descriptor list")]
111 RegDescListCreationFailed,
112 #[error("Failed to add registration descriptor")]
113 RegDescAddFailed,
114 #[error("Failed to create XferDlistHandle")]
115 FailedToCreateXferDlistHandle,
116 #[error("Failed to create backend")]
117 FailedToCreateBackend,
118 #[error("Telemetry is not enabled or transfer is not complete")]
119 NoTelemetry,
120}
121
122pub struct MemList {
124 inner: NonNull<bindings::nixl_capi_mem_list_s>,
125}
126
127impl Drop for MemList {
128 fn drop(&mut self) {
129 unsafe {
131 nixl_capi_destroy_mem_list(self.inner.as_ptr());
132 }
133 }
134}
135
136#[derive(Debug)]
137pub struct RegistrationHandle {
138 agent: Option<Arc<RwLock<AgentInner>>>,
139 ptr: usize,
140 size: usize,
141 dev_id: u64,
142 mem_type: MemType,
143}
144
145impl RegistrationHandle {
146 pub fn agent_name(&self) -> Option<String> {
147 self.agent
148 .as_ref()
149 .map(|agent| agent.read().unwrap().name.clone())
150 }
151
152 pub fn deregister(&mut self) -> Result<(), NixlError> {
153 if let Some(agent) = self.agent.take() {
154 tracing::trace!(
155 ptr = self.ptr,
156 size = self.size,
157 dev_id = self.dev_id,
158 mem_type = ?self.mem_type,
159 "Deregistering memory"
160 );
161 let mut reg_dlist = RegDescList::new(self.mem_type)?;
162 unsafe {
163 reg_dlist.add_desc(self.ptr, self.size, self.dev_id);
164 let _opt_args = OptArgs::new().unwrap();
165 nixl_capi_deregister_mem(
166 agent.write().unwrap().handle.as_ptr(),
167 reg_dlist.handle(),
168 _opt_args.inner.as_ptr(),
169 );
170 }
171 tracing::trace!("Memory deregistered successfully");
172 }
173 Ok(())
174 }
175}
176
177impl Drop for RegistrationHandle {
178 fn drop(&mut self) {
179 tracing::trace!(
180 ptr = self.ptr,
181 size = self.size,
182 dev_id = self.dev_id,
183 mem_type = ?self.mem_type,
184 "Dropping registration handle"
185 );
186 if let Err(e) = self.deregister() {
187 tracing::debug!(error = ?e, "Failed to deregister memory");
188 }
189 }
190}
191
192#[derive(Debug)]
194pub struct Backend {
195 inner: NonNull<bindings::nixl_capi_backend_s>,
196}
197
198unsafe impl Send for Backend {}
199unsafe impl Sync for Backend {}
200
201#[derive(Debug)]
203pub struct XferTelemetry {
204 pub start_time_us: u64,
206 pub post_duration_us: u64,
208 pub xfer_duration_us: u64,
210 pub total_bytes: u64,
212 pub desc_count: u64,
214}
215
216impl XferTelemetry {
217 pub fn start_time(&self) -> std::time::Duration {
219 std::time::Duration::from_micros(self.start_time_us)
220 }
221
222 pub fn post_duration(&self) -> std::time::Duration {
224 std::time::Duration::from_micros(self.post_duration_us)
225 }
226
227 pub fn xfer_duration(&self) -> std::time::Duration {
229 std::time::Duration::from_micros(self.xfer_duration_us)
230 }
231
232 pub fn total_duration(&self) -> std::time::Duration {
234 std::time::Duration::from_micros(self.post_duration_us + self.xfer_duration_us)
235 }
236
237 pub fn transfer_rate_bps(&self) -> f64 {
239 if self.xfer_duration_us == 0 {
240 0.0
241 } else {
242 (self.total_bytes as f64) / (self.xfer_duration_us as f64 / 1_000_000.0)
243 }
244 }
245}
246
247pub struct OptArgs {
249 inner: NonNull<bindings::nixl_capi_opt_args_s>,
250}
251
252impl OptArgs {
253 pub fn new() -> Result<Self, NixlError> {
255 let mut args = ptr::null_mut();
256
257 let status = unsafe { nixl_capi_create_opt_args(&mut args) };
258
259 match status {
260 0 => {
261 let inner = unsafe { NonNull::new_unchecked(args) };
263 Ok(Self { inner })
264 }
265 -1 => Err(NixlError::InvalidParam),
266 _ => Err(NixlError::BackendError),
267 }
268 }
269
270 pub fn add_backend(&mut self, backend: &Backend) -> Result<(), NixlError> {
272 let status =
273 unsafe { nixl_capi_opt_args_add_backend(self.inner.as_ptr(), backend.inner.as_ptr()) };
274 match status {
275 NIXL_CAPI_SUCCESS => Ok(()),
276 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
277 _ => Err(NixlError::BackendError),
278 }
279 }
280
281 pub fn set_notification_message(&mut self, message: &[u8]) -> Result<(), NixlError> {
283 let status = unsafe {
284 nixl_capi_opt_args_set_notif_msg(
285 self.inner.as_ptr(),
286 message.as_ptr() as *const _,
287 message.len(),
288 )
289 };
290 match status {
291 NIXL_CAPI_SUCCESS => Ok(()),
292 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
293 _ => Err(NixlError::BackendError),
294 }
295 }
296
297 pub fn get_notification_message(&self) -> Result<Vec<u8>, NixlError> {
299 let mut data = ptr::null_mut();
300 let mut len = 0;
301 let status =
302 unsafe { nixl_capi_opt_args_get_notif_msg(self.inner.as_ptr(), &mut data, &mut len) };
303
304 match status {
305 NIXL_CAPI_SUCCESS => {
306 if data.is_null() {
307 Ok(Vec::new())
308 } else {
309 let message = unsafe {
311 let slice = std::slice::from_raw_parts(data as *const u8, len);
312 let vec = slice.to_vec();
313 libc::free(data as *mut _);
314 vec
315 };
316 Ok(message)
317 }
318 }
319 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
320 _ => Err(NixlError::BackendError),
321 }
322 }
323
324 pub fn set_has_notification(&mut self, has_notification: bool) -> Result<(), NixlError> {
326 let status =
327 unsafe { nixl_capi_opt_args_set_has_notif(self.inner.as_ptr(), has_notification) };
328 match status {
329 NIXL_CAPI_SUCCESS => Ok(()),
330 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
331 _ => Err(NixlError::BackendError),
332 }
333 }
334
335 pub fn has_notification(&self) -> Result<bool, NixlError> {
337 let mut has_notification = false;
338 let status =
339 unsafe { nixl_capi_opt_args_get_has_notif(self.inner.as_ptr(), &mut has_notification) };
340 match status {
341 NIXL_CAPI_SUCCESS => Ok(has_notification),
342 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
343 _ => Err(NixlError::BackendError),
344 }
345 }
346
347 pub fn set_skip_descriptor_merge(&mut self, skip_merge: bool) -> Result<(), NixlError> {
349 let status =
350 unsafe { nixl_capi_opt_args_set_skip_desc_merge(self.inner.as_ptr(), skip_merge) };
351 match status {
352 NIXL_CAPI_SUCCESS => Ok(()),
353 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
354 _ => Err(NixlError::BackendError),
355 }
356 }
357
358 pub fn skip_descriptor_merge(&self) -> Result<bool, NixlError> {
360 let mut skip_merge = false;
361 let status =
362 unsafe { nixl_capi_opt_args_get_skip_desc_merge(self.inner.as_ptr(), &mut skip_merge) };
363 match status {
364 NIXL_CAPI_SUCCESS => Ok(skip_merge),
365 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
366 _ => Err(NixlError::BackendError),
367 }
368 }
369
370 pub fn set_ip_addr(&mut self, ip_addr: &str) -> Result<(), NixlError> {
373 let c_str = CString::new(ip_addr).expect("Failed to convert string to CString");
374 let status = unsafe { nixl_capi_opt_args_set_ip_addr(self.inner.as_ptr(), c_str.as_ptr()) };
375 match status {
376 NIXL_CAPI_SUCCESS => Ok(()),
377 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
378 _ => Err(NixlError::BackendError),
379 }
380 }
381
382 pub fn set_port(&mut self, port: u16) -> Result<(), NixlError> {
385 let status = unsafe { nixl_capi_opt_args_set_port(self.inner.as_ptr(), port) };
386 match status {
387 NIXL_CAPI_SUCCESS => Ok(()),
388 NIXL_CAPI_ERROR_INVALID_PARAM => Err(NixlError::InvalidParam),
389 _ => Err(NixlError::BackendError),
390 }
391 }
392}
393
394impl Drop for OptArgs {
395 fn drop(&mut self) {
396 tracing::trace!("Dropping optional arguments");
397 unsafe {
398 nixl_capi_destroy_opt_args(self.inner.as_ptr());
399 }
400 tracing::trace!("Optional arguments dropped");
401 }
402}
403
404impl MemList {
405 pub fn is_empty(&self) -> Result<bool, NixlError> {
407 let mut is_empty = false;
408
409 let status = unsafe { nixl_capi_mem_list_is_empty(self.inner.as_ptr(), &mut is_empty) };
411
412 match status {
413 0 => Ok(is_empty),
414 -1 => Err(NixlError::InvalidParam),
415 _ => Err(NixlError::BackendError),
416 }
417 }
418
419 pub fn len(&self) -> Result<usize, NixlError> {
421 let mut size = 0;
422
423 let status = unsafe { nixl_capi_mem_list_size(self.inner.as_ptr(), &mut size) };
425
426 match status {
427 0 => Ok(size),
428 -1 => Err(NixlError::InvalidParam),
429 _ => Err(NixlError::BackendError),
430 }
431 }
432
433 pub fn get(&self, index: usize) -> Result<MemType, NixlError> {
435 let mut mem_type = 0;
436
437 let status = unsafe { nixl_capi_mem_list_get(self.inner.as_ptr(), index, &mut mem_type) };
439
440 match status {
441 0 => Ok(MemType::from(mem_type)),
442 -1 => Err(NixlError::InvalidParam),
443 _ => Err(NixlError::BackendError),
444 }
445 }
446
447 pub fn iter(&self) -> MemListIterator<'_> {
449 MemListIterator {
450 list: self,
451 index: 0,
452 length: self.len().unwrap_or(0),
453 }
454 }
455}
456
457pub struct MemListIterator<'a> {
459 list: &'a MemList,
460 index: usize,
461 length: usize,
462}
463
464impl Iterator for MemListIterator<'_> {
465 type Item = Result<MemType, NixlError>;
466
467 fn next(&mut self) -> Option<Self::Item> {
468 if self.index >= self.length {
469 None
470 } else {
471 let result = self.list.get(self.index);
472 self.index += 1;
473 Some(result)
474 }
475 }
476
477 fn size_hint(&self) -> (usize, Option<usize>) {
478 let remaining = self.length - self.index;
479 (remaining, Some(remaining))
480 }
481}
482
483pub trait MemoryRegion: std::fmt::Debug + Send + Sync {
485 unsafe fn as_ptr(&self) -> *const u8;
492
493 fn size(&self) -> usize;
495}
496
497pub trait NixlDescriptor: MemoryRegion {
499 fn mem_type(&self) -> MemType;
501
502 fn device_id(&self) -> u64;
504}
505
506pub trait NixlRegistration: NixlDescriptor {
508 fn register(&mut self, agent: &Agent, opt_args: Option<&OptArgs>) -> Result<(), NixlError>;
509}
510
511#[derive(Debug)]
513pub struct SystemStorage {
514 data: Vec<u8>,
515 handle: Option<RegistrationHandle>,
516}
517
518impl SystemStorage {
519 pub fn new(size: usize) -> Result<Self, NixlError> {
521 let data = vec![0; size];
522 Ok(Self { data, handle: None })
523 }
524
525 pub fn memset(&mut self, value: u8) {
527 self.data.fill(value);
528 }
529
530 pub fn as_slice(&self) -> &[u8] {
532 &self.data
533 }
534}
535
536impl MemoryRegion for SystemStorage {
537 fn size(&self) -> usize {
538 self.data.len()
539 }
540
541 unsafe fn as_ptr(&self) -> *const u8 {
542 self.data.as_ptr()
543 }
544}
545
546impl NixlDescriptor for SystemStorage {
547 fn mem_type(&self) -> MemType {
548 MemType::Dram
549 }
550
551 fn device_id(&self) -> u64 {
552 0
553 }
554}
555
556impl NixlRegistration for SystemStorage {
557 fn register(&mut self, agent: &Agent, opt_args: Option<&OptArgs>) -> Result<(), NixlError> {
558 let handle = agent.register_memory(self, opt_args)?;
559 self.handle = Some(handle);
560 Ok(())
561 }
562}
563
564pub fn is_stub() -> bool {
565 unsafe { nixl_capi_is_stub() }
566}