#[macro_export]
#[allow(clippy::crate_in_macro_def)]
macro_rules! export {
( $L:ident ) => {
mod __foxglove_data_loader_export {
use crate::$L as Loader;
use std::cell::RefCell;
use foxglove_data_loader::{loader, DataLoader, MessageIterator};
foxglove_data_loader::__generated::export!(
DataLoaderWrapper with_types_in foxglove_data_loader::__generated
);
struct DataLoaderWrapper {
loader: RefCell<Loader>,
}
impl loader::Guest for DataLoaderWrapper {
type DataLoader = Self;
type MessageIterator = MessageIteratorWrapper;
}
impl loader::GuestDataLoader for DataLoaderWrapper {
fn new(args: loader::DataLoaderArgs) -> Self {
Self { loader: RefCell::new(<Loader as DataLoader>::new(args)) }
}
fn initialize(&self) -> Result<loader::Initialization, String> {
self.loader.borrow_mut()
.initialize()
.map(|init| init.into())
.map_err(|err| err.to_string())
}
fn create_iterator(
&self,
args: loader::MessageIteratorArgs,
) -> Result<loader::MessageIterator, String> {
let message_iterator = self.loader.borrow_mut()
.create_iter(args)
.map_err(|err| err.to_string())?;
Ok(loader::MessageIterator::new(MessageIteratorWrapper {
message_iterator: RefCell::new(message_iterator),
}))
}
fn get_backfill(&self, args: loader::BackfillArgs) -> Result<Vec<loader::Message>, String> {
self.loader.borrow_mut()
.get_backfill(args)
.map_err(|err| err.to_string())
}
}
struct MessageIteratorWrapper {
message_iterator: RefCell<<Loader as DataLoader>::MessageIterator>,
}
impl loader::GuestMessageIterator for MessageIteratorWrapper {
fn next(&self) -> Option<Result<loader::Message, String>> {
self.message_iterator.borrow_mut()
.next()
.map(|r| r.map_err(|err| err.to_string()))
}
}
}
}
}
use anyhow::anyhow;
use std::collections::BTreeMap;
use std::{cell::RefCell, rc::Rc};
#[doc(inline)]
pub use __generated::exports::foxglove::loader::loader::{
BackfillArgs, Channel, ChannelId, DataLoaderArgs, Message, MessageIteratorArgs, Schema,
SchemaId, Severity, TimeRange,
};
#[doc(inline)]
pub use __generated::foxglove::loader::{console, reader};
#[doc(hidden)]
pub use __generated::exports::foxglove::loader::loader;
impl std::io::Read for reader::Reader {
fn read(&mut self, dst: &mut [u8]) -> Result<usize, std::io::Error> {
let ptr = dst.as_ptr() as _;
let len = dst.len() as _;
Ok(reader::Reader::read(self, ptr, len) as usize)
}
}
impl std::io::Seek for reader::Reader {
fn seek(&mut self, seek: std::io::SeekFrom) -> Result<u64, std::io::Error> {
match seek {
std::io::SeekFrom::Start(offset) => {
reader::Reader::seek(self, offset);
}
std::io::SeekFrom::End(offset) => {
let end = reader::Reader::size(self) as i64;
reader::Reader::seek(self, (end + offset) as u64);
}
std::io::SeekFrom::Current(offset) => {
let pos = reader::Reader::position(self) as i64;
reader::Reader::seek(self, (pos + offset) as u64);
}
}
Ok(reader::Reader::position(self))
}
}
#[derive(Clone, Debug)]
pub struct Problem(loader::Problem);
impl Problem {
pub fn new(severity: Severity, message: impl Into<String>) -> Self {
Self(loader::Problem {
severity,
message: message.into(),
tip: None,
})
}
pub fn tip(mut self, tip: impl Into<String>) -> Self {
self.0.tip = Some(tip.into());
self
}
pub fn error(message: impl Into<String>) -> Self {
Self::new(Severity::Error, message)
}
pub fn warn(message: impl Into<String>) -> Self {
Self::new(Severity::Warn, message)
}
pub fn info(message: impl Into<String>) -> Self {
Self::new(Severity::Info, message)
}
fn into_inner(self) -> loader::Problem {
self.0
}
}
impl<T: Into<String>> From<T> for Problem {
fn from(value: T) -> Self {
Self::error(value)
}
}
#[derive(Debug, Clone, Default)]
pub struct Initialization {
channels: Vec<loader::Channel>,
schemas: Vec<loader::Schema>,
time_range: TimeRange,
problems: Vec<Problem>,
}
impl From<Initialization> for loader::Initialization {
fn from(init: Initialization) -> loader::Initialization {
loader::Initialization {
channels: init.channels,
schemas: init.schemas,
time_range: init.time_range,
problems: init.problems.into_iter().map(|p| p.into_inner()).collect(),
}
}
}
impl Initialization {
pub fn builder() -> InitializationBuilder {
InitializationBuilder::default()
}
}
#[derive(Debug)]
struct SchemaManager {
next_schema_id: u16,
schemas: BTreeMap<u16, LinkedSchema>,
}
impl Default for SchemaManager {
fn default() -> Self {
Self {
next_schema_id: 1,
schemas: Default::default(),
}
}
}
impl SchemaManager {
fn get_free_id(&mut self) -> u16 {
loop {
let current_id = self.next_schema_id;
self.next_schema_id += 1;
if self.schemas.contains_key(¤t_id) {
continue;
}
return current_id;
}
}
fn add_schema(
&mut self,
id: u16,
schema: foxglove::Schema,
channels: &Rc<RefCell<ChannelManager>>,
) -> Option<LinkedSchema> {
if self.schemas.contains_key(&id) {
return None;
}
let schema = LinkedSchema {
id,
schema,
channels: channels.clone(),
message_encoding: String::from(""),
};
self.schemas.insert(id, schema.clone());
Some(schema)
}
}
#[derive(Debug)]
struct ChannelManager {
next_channel_id: u16,
channels: BTreeMap<u16, LinkedChannel>,
}
impl Default for ChannelManager {
fn default() -> Self {
Self {
next_channel_id: 1,
channels: Default::default(),
}
}
}
impl ChannelManager {
fn add_channel(&mut self, id: u16, topic_name: impl Into<String>) -> Option<LinkedChannel> {
if self.channels.contains_key(&id) {
return None;
}
let channel = LinkedChannel {
id,
schema_id: Rc::new(RefCell::new(None)),
topic_name: topic_name.into(),
message_encoding: Rc::new(RefCell::new("".into())),
message_count: Rc::new(RefCell::new(None)),
};
self.channels.insert(id, channel.clone());
Some(channel)
}
fn get_free_id(&mut self) -> u16 {
loop {
let current_id = self.next_channel_id;
self.next_channel_id += 1;
if self.channels.contains_key(¤t_id) {
continue;
}
return current_id;
}
}
}
#[derive(Debug, Clone)]
pub struct InitializationBuilder {
channels: Rc<RefCell<ChannelManager>>,
schemas: Rc<RefCell<SchemaManager>>,
time_range: loader::TimeRange,
problems: Vec<Problem>,
}
impl Default for InitializationBuilder {
fn default() -> Self {
Self {
schemas: Rc::new(RefCell::new(SchemaManager::default())),
channels: Rc::new(RefCell::new(ChannelManager::default())),
time_range: TimeRange::default(),
problems: vec![],
}
}
}
#[allow(clippy::derivable_impls)]
impl Default for TimeRange {
fn default() -> Self {
TimeRange {
start_time: 0,
end_time: 0,
}
}
}
impl InitializationBuilder {
pub fn time_range(mut self, time_range: TimeRange) -> Self {
self.time_range = time_range;
self
}
pub fn start_time(mut self, start_time: u64) -> Self {
self.time_range.start_time = start_time;
self
}
pub fn end_time(mut self, end_time: u64) -> Self {
self.time_range.end_time = end_time;
self
}
pub fn add_channel(&mut self, topic_name: &str) -> LinkedChannel {
let id = { self.channels.borrow_mut().get_free_id() };
self.add_channel_with_id(id, topic_name)
.expect("id was checked to be free above")
}
pub fn add_channel_with_id(&mut self, id: u16, topic_name: &str) -> Option<LinkedChannel> {
let mut channels = self.channels.borrow_mut();
channels.add_channel(id, topic_name)
}
pub fn add_schema(&mut self, schema: foxglove::Schema) -> LinkedSchema {
let id = { self.schemas.borrow_mut().get_free_id() };
self.add_schema_with_id(id, schema)
.expect("id was checked to be free above")
}
pub fn add_schema_with_id(
&mut self,
id: u16,
schema: foxglove::Schema,
) -> Option<LinkedSchema> {
assert!(id > 0, "schema id cannot be zero");
let mut schemas = self.schemas.borrow_mut();
schemas.add_schema(id, schema, &self.channels)
}
pub fn add_encode<T: foxglove::Encode>(&mut self) -> Result<LinkedSchema, anyhow::Error> {
let schema_id = { self.schemas.borrow_mut().get_free_id() };
Ok(self
.add_encode_with_id::<T>(schema_id)?
.expect("id was checked to be free above"))
}
pub fn add_encode_with_id<T: foxglove::Encode>(
&mut self,
id: u16,
) -> Result<Option<LinkedSchema>, anyhow::Error> {
let schema = T::get_schema().ok_or(anyhow!["Failed to get schema"])?;
let linked_schema = self
.add_schema_with_id(id, schema)
.map(|s| s.message_encoding(T::get_message_encoding()));
Ok(linked_schema)
}
pub fn add_problem(mut self, problem: impl Into<Problem>) -> Self {
self.problems.push(problem.into());
self
}
pub fn build(self) -> Initialization {
let schemas = self
.schemas
.borrow()
.schemas
.values()
.cloned()
.map(Schema::from)
.collect();
let channels = self
.channels
.borrow()
.channels
.values()
.cloned()
.map(Channel::from)
.collect();
Initialization {
channels,
schemas,
time_range: self.time_range,
problems: self.problems,
}
}
}
#[derive(Debug, Clone)]
pub struct LinkedSchema {
id: SchemaId,
schema: foxglove::Schema,
channels: Rc<RefCell<ChannelManager>>,
message_encoding: String,
}
impl LinkedSchema {
pub fn id(&self) -> SchemaId {
self.id
}
pub fn add_channel_with_id(&self, id: u16, topic_name: &str) -> Option<LinkedChannel> {
let mut channels = self.channels.borrow_mut();
channels.add_channel(id, topic_name).map(|channel| {
channel
.message_encoding(self.message_encoding.clone())
.schema(self)
})
}
pub fn add_channel(&self, topic_name: &str) -> LinkedChannel {
let next_id = { self.channels.borrow_mut().get_free_id() };
self.add_channel_with_id(next_id, topic_name)
.expect("id was checked to be free above")
}
pub fn message_encoding(mut self, message_encoding: impl Into<String>) -> Self {
self.message_encoding = message_encoding.into();
self
}
}
#[derive(Debug, Clone)]
pub struct LinkedChannel {
id: ChannelId,
schema_id: Rc<RefCell<Option<SchemaId>>>,
topic_name: String,
message_encoding: Rc<RefCell<String>>,
message_count: Rc<RefCell<Option<u64>>>,
}
impl LinkedChannel {
pub fn id(&self) -> ChannelId {
self.id
}
pub fn message_count(self, message_count: u64) -> Self {
self.message_count.replace(Some(message_count));
self
}
pub fn message_encoding(self, message_encoding: impl Into<String>) -> Self {
self.message_encoding.replace(message_encoding.into());
self
}
pub fn schema(self, linked_schema: &LinkedSchema) -> Self {
self.schema_id.replace(Some(linked_schema.id));
self
}
}
impl From<LinkedChannel> for loader::Channel {
fn from(ch: LinkedChannel) -> loader::Channel {
loader::Channel {
id: ch.id,
schema_id: *ch.schema_id.borrow(),
topic_name: ch.topic_name.clone(),
message_encoding: ch.message_encoding.borrow().clone(),
message_count: *ch.message_count.borrow(),
}
}
}
impl From<LinkedSchema> for loader::Schema {
fn from(value: LinkedSchema) -> Self {
loader::Schema {
id: value.id,
name: value.schema.name,
encoding: value.schema.encoding,
data: value.schema.data.to_vec(),
}
}
}
pub trait DataLoader: 'static + Sized {
type MessageIterator: MessageIterator;
type Error: Into<Box<dyn std::error::Error>>;
fn new(args: DataLoaderArgs) -> Self;
fn initialize(&mut self) -> Result<Initialization, Self::Error>;
fn create_iter(
&mut self,
args: loader::MessageIteratorArgs,
) -> Result<Self::MessageIterator, Self::Error>;
fn get_backfill(
&mut self,
_args: loader::BackfillArgs,
) -> Result<Vec<loader::Message>, Self::Error> {
Ok(Vec::new())
}
}
pub trait MessageIterator: 'static + Sized {
type Error: Into<Box<dyn std::error::Error>>;
fn next(&mut self) -> Option<Result<Message, Self::Error>>;
}
#[doc(hidden)]
pub mod __generated {
wit_bindgen::generate!({
world: "host",
export_macro_name: "export",
pub_export_macro: true,
path: "./wit",
});
}
#[cfg(test)]
mod tests;