use core::ptr::copy_nonoverlapping;
use core::time::Duration;
use std::io::Write;
use std::time::Instant;
use crate::cli::ReplayOptions;
use crate::command::get_pubsub_service_types;
use anyhow::Result;
use iceoryx2::prelude::*;
use iceoryx2::service::builder::{CustomHeaderMarker, CustomPayloadMarker};
use iceoryx2::service::static_config::message_type_details::TypeVariant;
use iceoryx2_cli::Format;
use iceoryx2_userland_record_and_replay::prelude::*;
use iceoryx2_userland_record_and_replay::record_header::{
FILE_FORMAT_HUMAN_READABLE_VERSION, FILE_FORMAT_IOX2_DUMP_VERSION, RecordHeaderDetails,
};
pub(crate) fn replay(options: ReplayOptions, _format: Format) -> Result<()> {
let node = NodeBuilder::new()
.name(&NodeName::new(&options.node_name)?)
.create::<ipc::Service>()?;
let replay = ReplayerOpener::new(&FilePath::new(options.input.as_bytes())?)
.data_representation(options.data_representation.into())
.open()?;
let service_name = match options.service {
Some(v) => ServiceName::new(&v)?,
None => replay.header().service_name,
};
let required_header = RecordHeaderDetails {
file_format_version: match options.data_representation {
crate::cli::DataRepresentation::HumanReadable => FILE_FORMAT_HUMAN_READABLE_VERSION,
crate::cli::DataRepresentation::Iox2Dump => FILE_FORMAT_IOX2_DUMP_VERSION,
},
types: get_pubsub_service_types(&service_name, &node)?,
messaging_pattern: options.messaging_pattern.into(),
};
if required_header != replay.header().details {
return Err(anyhow::anyhow!(
"The expected header {required_header:?} does not match the actual header {:?}.",
replay.header().details
));
}
let buffer = replay.read_into_buffer()?;
let service = unsafe {
node.service_builder(&service_name)
.publish_subscribe::<[CustomPayloadMarker]>()
.user_header::<CustomHeaderMarker>()
.__internal_set_payload_type_details(&required_header.types.payload)
.__internal_set_user_header_type_details(&required_header.types.user_header)
.open_or_create()?
};
let publisher = match required_header.types.payload.variant() {
TypeVariant::FixedSize => service.publisher_builder().create()?,
TypeVariant::Dynamic => service
.publisher_builder()
.initial_max_slice_len(4096)
.allocation_strategy(AllocationStrategy::PowerOfTwo)
.create()?,
};
println!("Start replaying data on \"{service_name}\".");
for n in 0..u64::MAX {
let start = Instant::now();
for data in &buffer {
let sample = unsafe {
let mut sample = publisher.loan_custom_payload(1)?;
copy_nonoverlapping(
data.payload.as_ptr(),
sample.payload_mut().as_ptr() as *mut u8,
data.payload.len(),
);
if !data.user_header.is_empty() {
copy_nonoverlapping(
data.user_header.as_ptr(),
(sample.user_header_mut() as *mut CustomHeaderMarker) as *mut u8,
data.user_header.len(),
);
}
sample.assume_init()
};
let elapsed = start.elapsed().as_millis() as f64 * options.time_factor as f64;
let timestamp = data.timestamp.as_millis() as f64 * options.time_factor as f64;
if elapsed < timestamp {
std::thread::sleep(Duration::from_millis((timestamp - elapsed) as u64));
}
sample.send()?;
print!(".");
std::io::stdout().flush()?;
}
if options.repetitions <= n {
break;
}
}
Ok(())
}