use alloc::string::String;
use alloc::vec::Vec;
use zerodds_opcua_gateway::data_value::{DataValue, Variant};
use crate::binary::{UaEncode, UaWriter, len_i32};
use crate::config::{
ConfigurationVersion, DataSetFieldContentMask, DataSetMessageContentMask, DataSetWriterConfig,
NetworkMessageContentMask, WriterGroupConfig,
};
use crate::error::EncodeError;
use crate::uadp::dataset_message::{
DataSetData, DataSetMessage, DataSetMessageKind, FieldEncoding,
};
use crate::uadp::network_message::{GroupHeader, NetworkMessage, PublisherId};
fn null_variant() -> Variant {
Variant {
array_dimensions: Vec::new(),
value: Vec::new(),
}
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct PublishedDataSet {
name: String,
field_names: Vec<String>,
values: Vec<DataValue>,
}
impl PublishedDataSet {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
field_names: Vec::new(),
values: Vec::new(),
}
}
pub fn add_field(&mut self, name: impl Into<String>, initial: DataValue) -> &mut Self {
self.field_names.push(name.into());
self.values.push(initial);
self
}
pub fn set(&mut self, name: &str, value: DataValue) -> bool {
match self.field_names.iter().position(|n| n == name) {
Some(i) => {
self.values[i] = value;
true
}
None => false,
}
}
pub fn set_variant(&mut self, name: &str, value: Variant) -> bool {
self.set(
name,
DataValue {
value: Some(value),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
},
)
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn field_names(&self) -> &[String] {
&self.field_names
}
#[must_use]
pub fn values(&self) -> &[DataValue] {
&self.values
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct DataSetWriter {
config: DataSetWriterConfig,
config_version: ConfigurationVersion,
publish_count: u32,
sequence_number: u16,
last_values: Vec<DataValue>,
has_baseline: bool,
}
impl DataSetWriter {
#[must_use]
pub fn new(config: DataSetWriterConfig, config_version: ConfigurationVersion) -> Self {
Self {
config,
config_version,
publish_count: 0,
sequence_number: 0,
last_values: Vec::new(),
has_baseline: false,
}
}
#[must_use]
pub fn config(&self) -> &DataSetWriterConfig {
&self.config
}
pub fn produce(
&mut self,
dataset: &PublishedDataSet,
timestamp: Option<i64>,
) -> Result<DataSetMessage, EncodeError> {
let encoding = self.config.field_content_mask.field_encoding();
let key_frame_count = self.config.key_frame_count.max(1);
let field_count_changed = self.last_values.len() != dataset.values().len();
let is_key = !self.has_baseline
|| encoding == FieldEncoding::RawData
|| field_count_changed
|| self.publish_count % key_frame_count == 0;
let data = if is_key {
self.build_key_frame(dataset, encoding)?
} else {
self.build_delta_frame(dataset, encoding)
};
let mask = self.config.message_content_mask;
let sequence_number = mask
.contains(DataSetMessageContentMask::SEQUENCE_NUMBER)
.then(|| {
let sn = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);
sn
});
let message = DataSetMessage {
writer_id: self.config.data_set_writer_id,
valid: true,
kind: if is_key {
DataSetMessageKind::KeyFrame
} else {
DataSetMessageKind::DeltaFrame
},
sequence_number,
timestamp: mask
.contains(DataSetMessageContentMask::TIMESTAMP)
.then_some(timestamp)
.flatten(),
pico_seconds: mask
.contains(DataSetMessageContentMask::PICOSECONDS)
.then_some(0),
status: mask
.contains(DataSetMessageContentMask::STATUS)
.then_some(0),
config_major_version: mask
.contains(DataSetMessageContentMask::MAJOR_VERSION)
.then_some(self.config_version.major_version),
config_minor_version: mask
.contains(DataSetMessageContentMask::MINOR_VERSION)
.then_some(self.config_version.minor_version),
data,
};
self.last_values = dataset.values().to_vec();
self.has_baseline = true;
self.publish_count = self.publish_count.wrapping_add(1);
Ok(message)
}
fn build_key_frame(
&self,
dataset: &PublishedDataSet,
encoding: FieldEncoding,
) -> Result<DataSetData, EncodeError> {
Ok(match encoding {
FieldEncoding::Variant => {
DataSetData::Variant(dataset.values().iter().map(field_variant).collect())
}
FieldEncoding::DataValue => DataSetData::DataValue(
dataset
.values()
.iter()
.map(|dv| project_data_value(dv, self.config.field_content_mask))
.collect(),
),
FieldEncoding::RawData => DataSetData::Raw(encode_raw_fields(dataset.values())?),
})
}
fn build_delta_frame(
&self,
dataset: &PublishedDataSet,
encoding: FieldEncoding,
) -> DataSetData {
let changed = dataset
.values()
.iter()
.enumerate()
.filter(|(i, dv)| self.last_values.get(*i) != Some(*dv));
match encoding {
FieldEncoding::DataValue => DataSetData::DeltaDataValue(
changed
.map(|(i, dv)| {
(
i as u16,
project_data_value(dv, self.config.field_content_mask),
)
})
.collect(),
),
_ => DataSetData::DeltaVariant(
changed
.map(|(i, dv)| (i as u16, field_variant(dv)))
.collect(),
),
}
}
}
fn field_variant(dv: &DataValue) -> Variant {
dv.value.clone().unwrap_or_else(null_variant)
}
fn project_data_value(dv: &DataValue, mask: DataSetFieldContentMask) -> DataValue {
DataValue {
value: dv.value.clone(),
status: mask
.contains(DataSetFieldContentMask::STATUS_CODE)
.then_some(dv.status.unwrap_or(0)),
source_timestamp: mask
.contains(DataSetFieldContentMask::SOURCE_TIMESTAMP)
.then_some(dv.source_timestamp.unwrap_or(0)),
server_timestamp: mask
.contains(DataSetFieldContentMask::SERVER_TIMESTAMP)
.then_some(dv.server_timestamp.unwrap_or(0)),
source_pico_sec: mask
.contains(DataSetFieldContentMask::SOURCE_PICOSECONDS)
.then_some(dv.source_pico_sec.unwrap_or(0)),
server_pico_sec: mask
.contains(DataSetFieldContentMask::SERVER_PICOSECONDS)
.then_some(dv.server_pico_sec.unwrap_or(0)),
}
}
fn encode_raw_fields(values: &[DataValue]) -> Result<Vec<u8>, EncodeError> {
let mut w = UaWriter::new();
for dv in values {
let v = dv.value.as_ref().ok_or(EncodeError::ValueOutOfRange {
message: "RawData field has no value",
})?;
if v.is_scalar() {
let elem = v.value.first().ok_or(EncodeError::ValueOutOfRange {
message: "RawData scalar field is empty",
})?;
elem.encode(&mut w)?;
} else if v.is_1d_array() {
w.write_i32(len_i32("RawData array", v.value.len())?);
for elem in &v.value {
elem.encode(&mut w)?;
}
} else {
return Err(EncodeError::ValueOutOfRange {
message: "RawData encoding of multi-dimensional arrays is not supported",
});
}
}
Ok(w.into_vec())
}
#[derive(Debug, Clone)]
pub struct WriterGroup {
config: WriterGroupConfig,
publisher_id: PublisherId,
network_message_number: u16,
sequence_number: u16,
}
impl WriterGroup {
#[must_use]
pub fn new(config: WriterGroupConfig, publisher_id: PublisherId) -> Self {
Self {
config,
publisher_id,
network_message_number: 0,
sequence_number: 0,
}
}
#[must_use]
pub fn config(&self) -> &WriterGroupConfig {
&self.config
}
pub fn frame(
&mut self,
messages: Vec<DataSetMessage>,
timestamp: Option<i64>,
) -> NetworkMessage {
let mask = self.config.network_message_content_mask;
let group_header = mask
.contains(NetworkMessageContentMask::GROUP_HEADER)
.then(|| {
let network_message_number = mask
.contains(NetworkMessageContentMask::NETWORK_MESSAGE_NUMBER)
.then(|| {
let n = self.network_message_number;
self.network_message_number = self.network_message_number.wrapping_add(1);
n
});
let sequence_number = mask
.contains(NetworkMessageContentMask::SEQUENCE_NUMBER)
.then(|| {
let n = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);
n
});
GroupHeader {
writer_group_id: mask
.contains(NetworkMessageContentMask::WRITER_GROUP_ID)
.then_some(self.config.writer_group_id),
group_version: mask
.contains(NetworkMessageContentMask::GROUP_VERSION)
.then_some(self.config.group_version),
network_message_number,
sequence_number,
}
});
let payload_header =
mask.contains(NetworkMessageContentMask::PAYLOAD_HEADER) || messages.len() > 1;
NetworkMessage {
publisher_id: mask
.contains(NetworkMessageContentMask::PUBLISHER_ID)
.then(|| self.publisher_id.clone()),
data_set_class_id: None,
group_header,
timestamp: mask
.contains(NetworkMessageContentMask::TIMESTAMP)
.then_some(timestamp)
.flatten(),
pico_seconds: mask
.contains(NetworkMessageContentMask::PICOSECONDS)
.then_some(0),
promoted_fields: Vec::new(),
payload_header,
messages,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::uadp::dataset_message::DataSetData;
use zerodds_opcua_gateway::data_value::VariantValue;
fn dv_int(v: i32) -> DataValue {
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(v))),
status: None,
source_timestamp: None,
server_timestamp: None,
source_pico_sec: None,
server_pico_sec: None,
}
}
fn sample_dataset() -> PublishedDataSet {
let mut pds = PublishedDataSet::new("ds1");
pds.add_field("a", dv_int(1)).add_field("b", dv_int(2));
pds
}
#[test]
fn first_publish_is_key_frame() {
let mut w = DataSetWriter::new(
DataSetWriterConfig::new("w1", 5, "ds1"),
ConfigurationVersion::default(),
);
let msg = w.produce(&sample_dataset(), None).expect("produce");
assert_eq!(msg.kind, DataSetMessageKind::KeyFrame);
assert_eq!(msg.writer_id, 5);
assert_eq!(
msg.data,
DataSetData::Variant(alloc::vec![
Variant::scalar(VariantValue::Int32(1)),
Variant::scalar(VariantValue::Int32(2)),
])
);
}
#[test]
fn second_publish_is_delta_with_only_changed_fields() {
let mut cfg = DataSetWriterConfig::new("w1", 5, "ds1");
cfg.key_frame_count = 10; let mut w = DataSetWriter::new(cfg, ConfigurationVersion::default());
let mut pds = sample_dataset();
let _key = w.produce(&pds, None).expect("key");
pds.set("b", dv_int(99));
let delta = w.produce(&pds, None).expect("delta");
assert_eq!(delta.kind, DataSetMessageKind::DeltaFrame);
assert_eq!(
delta.data,
DataSetData::DeltaVariant(alloc::vec![(1, Variant::scalar(VariantValue::Int32(99)))])
);
}
#[test]
fn key_frame_count_forces_periodic_key() {
let mut cfg = DataSetWriterConfig::new("w1", 1, "ds1");
cfg.key_frame_count = 2; let mut w = DataSetWriter::new(cfg, ConfigurationVersion::default());
let pds = sample_dataset();
assert_eq!(
w.produce(&pds, None).expect("p0").kind,
DataSetMessageKind::KeyFrame
);
assert_eq!(
w.produce(&pds, None).expect("p1").kind,
DataSetMessageKind::DeltaFrame
);
assert_eq!(
w.produce(&pds, None).expect("p2").kind,
DataSetMessageKind::KeyFrame
);
}
#[test]
fn data_value_mask_projects_selected_members() {
let mut cfg = DataSetWriterConfig::new("w1", 1, "ds1");
cfg.field_content_mask = DataSetFieldContentMask::from_bits(
DataSetFieldContentMask::STATUS_CODE | DataSetFieldContentMask::SOURCE_TIMESTAMP,
);
let mut w = DataSetWriter::new(cfg, ConfigurationVersion::default());
let mut pds = PublishedDataSet::new("ds1");
pds.add_field(
"a",
DataValue {
value: Some(Variant::scalar(VariantValue::Int32(7))),
status: Some(0x8000_0000),
source_timestamp: Some(123),
server_timestamp: Some(456),
source_pico_sec: Some(9),
server_pico_sec: Some(9),
},
);
let msg = w.produce(&pds, None).expect("produce");
let DataSetData::DataValue(fields) = msg.data else {
panic!("expected DataValue encoding");
};
let f = &fields[0];
assert_eq!(f.status, Some(0x8000_0000));
assert_eq!(f.source_timestamp, Some(123));
assert_eq!(f.server_timestamp, None);
assert_eq!(f.source_pico_sec, None);
assert_eq!(f.server_pico_sec, None);
}
#[test]
fn raw_data_is_always_key_frame() {
let mut cfg = DataSetWriterConfig::new("w1", 1, "ds1");
cfg.field_content_mask = DataSetFieldContentMask::raw_data();
cfg.key_frame_count = 1;
let mut w = DataSetWriter::new(cfg, ConfigurationVersion::default());
let pds = sample_dataset();
let m0 = w.produce(&pds, None).expect("p0");
let m1 = w.produce(&pds, None).expect("p1");
assert_eq!(m0.kind, DataSetMessageKind::KeyFrame);
assert_eq!(m1.kind, DataSetMessageKind::KeyFrame);
assert!(matches!(m0.data, DataSetData::Raw(_)));
}
#[test]
fn sequence_number_emitted_and_rolls_when_masked() {
let mut cfg = DataSetWriterConfig::new("w1", 1, "ds1");
cfg.message_content_mask =
DataSetMessageContentMask::from_bits(DataSetMessageContentMask::SEQUENCE_NUMBER);
let mut w = DataSetWriter::new(cfg, ConfigurationVersion::default());
let pds = sample_dataset();
assert_eq!(w.produce(&pds, None).expect("p0").sequence_number, Some(0));
assert_eq!(w.produce(&pds, None).expect("p1").sequence_number, Some(1));
}
#[test]
fn writer_group_frames_with_group_header_and_publisher() {
let cfg = WriterGroupConfig::new("g1", 42);
let mut group = WriterGroup::new(cfg, PublisherId::UInt16(7));
let mut wcfg = WriterGroupConfig::new("g1", 42);
wcfg.network_message_content_mask = NetworkMessageContentMask::from_bits(
NetworkMessageContentMask::PUBLISHER_ID
| NetworkMessageContentMask::GROUP_HEADER
| NetworkMessageContentMask::WRITER_GROUP_ID
| NetworkMessageContentMask::SEQUENCE_NUMBER
| NetworkMessageContentMask::PAYLOAD_HEADER,
);
group.config = wcfg;
let msg = DataSetMessage::key_frame_variant(
5,
alloc::vec![Variant::scalar(VariantValue::Int32(1))],
);
let nm0 = group.frame(alloc::vec![msg.clone()], None);
assert_eq!(nm0.publisher_id, Some(PublisherId::UInt16(7)));
let gh = nm0.group_header.expect("group header");
assert_eq!(gh.writer_group_id, Some(42));
assert_eq!(gh.sequence_number, Some(0));
let nm1 = group.frame(alloc::vec![msg], None);
assert_eq!(nm1.group_header.expect("gh").sequence_number, Some(1));
}
}