mod manifest;
pub use manifest::*;
#[deprecated(since = "0.20.0", note = "Renamed to DataSource")]
pub type UpstreamSource = DataSource;
use std::num::NonZeroU16;
use crate::Encode;
pub struct ChannelSet {
topics: Vec<Topic>,
schemas: Vec<Schema>,
next_schema_id: Option<NonZeroU16>,
}
impl ChannelSet {
pub fn new() -> Self {
Self {
topics: Vec::new(),
schemas: Vec::new(),
next_schema_id: Some(NonZeroU16::MIN),
}
}
pub fn insert<T: Encode>(&mut self, topic: impl Into<String>) {
let schema_id = T::get_schema().map(|s| self.add_schema(s));
self.topics.push(Topic {
name: topic.into(),
message_encoding: T::get_message_encoding(),
schema_id,
});
}
pub fn into_topics_and_schemas(self) -> (Vec<Topic>, Vec<Schema>) {
(self.topics, self.schemas)
}
fn add_schema(&mut self, schema: crate::Schema) -> NonZeroU16 {
let existing = self.schemas.iter().find(|existing| {
existing.name == schema.name
&& existing.encoding == schema.encoding
&& existing.data.as_ref() == schema.data.as_ref()
});
if let Some(existing) = existing {
existing.id
} else {
let id = self
.next_schema_id
.expect("should not add more than 65535 schemas");
self.next_schema_id = id.checked_add(1);
self.schemas.push(Schema {
id,
name: schema.name,
encoding: schema.encoding,
data: schema.data.into(),
});
id
}
}
}
impl Default for ChannelSet {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
#[test]
fn test_streamed_source_builder_snapshot() {
let mut channels = ChannelSet::new();
channels.insert::<crate::messages::Vector3>("/topic1");
channels.insert::<crate::messages::Vector3>("/topic2");
let (topics, schemas) = channels.into_topics_and_schemas();
assert_eq!(topics.len(), 2);
assert_eq!(schemas.len(), 1);
let source = StreamedSource {
url: "/v1/data".into(),
id: Some("test-id".into()),
topics,
schemas,
start_time: Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap(),
end_time: Utc.with_ymd_and_hms(2024, 1, 2, 0, 0, 0).unwrap(),
};
let manifest = Manifest {
name: Some("Test Source".into()),
sources: vec![DataSource::Streamed(source)],
};
insta::assert_json_snapshot!(manifest);
}
}