pub use crate::channels::{ChannelDirection, ChannelItem, extract_channel_refs, is_channel_ref};
use std::sync::Arc;
use serde_json::Value;
use crate::error::IIIError;
use crate::iii::{III, RegisterFunction};
use crate::stream_provider::IStream;
use crate::types::{
Channel, StreamDeleteInput, StreamGetInput, StreamListGroupsInput, StreamListInput,
StreamSetInput,
};
pub async fn create_channel(iii: &III, buffer_size: Option<usize>) -> Result<Channel, IIIError> {
crate::iii::internal_create_channel(iii, buffer_size).await
}
pub fn create_stream<S>(iii: &III, stream_name: impl Into<String>, stream: S)
where
S: IStream,
{
let stream: Arc<S> = Arc::new(stream);
let stream_name = stream_name.into();
let s = stream.clone();
iii.register_function(
format!("stream::get({stream_name})"),
RegisterFunction::new_async(move |input: Value| {
let s = s.clone();
async move {
let typed: StreamGetInput =
serde_json::from_value(input).map_err(|e| IIIError::Serde(e.to_string()))?;
let out = s.get(typed).await?;
Ok(serde_json::to_value(out).unwrap_or_default())
}
}),
);
let s = stream.clone();
iii.register_function(
format!("stream::set({stream_name})"),
RegisterFunction::new_async(move |input: Value| {
let s = s.clone();
async move {
let typed: StreamSetInput =
serde_json::from_value(input).map_err(|e| IIIError::Serde(e.to_string()))?;
let out = s.set(typed).await?;
Ok(serde_json::to_value(out).unwrap_or_default())
}
}),
);
let s = stream.clone();
iii.register_function(
format!("stream::delete({stream_name})"),
RegisterFunction::new_async(move |input: Value| {
let s = s.clone();
async move {
let typed: StreamDeleteInput =
serde_json::from_value(input).map_err(|e| IIIError::Serde(e.to_string()))?;
let out = s.delete(typed).await?;
Ok(serde_json::to_value(out).unwrap_or_default())
}
}),
);
let s = stream.clone();
iii.register_function(
format!("stream::list({stream_name})"),
RegisterFunction::new_async(move |input: Value| {
let s = s.clone();
async move {
let typed: StreamListInput =
serde_json::from_value(input).map_err(|e| IIIError::Serde(e.to_string()))?;
let out = s.list(typed).await?;
Ok(serde_json::to_value(out).unwrap_or_default())
}
}),
);
let s = stream.clone();
iii.register_function(
format!("stream::list_groups({stream_name})"),
RegisterFunction::new_async(move |input: Value| {
let s = s.clone();
async move {
let typed: StreamListGroupsInput =
serde_json::from_value(input).map_err(|e| IIIError::Serde(e.to_string()))?;
let out = s.list_groups(typed).await?;
Ok(serde_json::to_value(out).unwrap_or_default())
}
}),
);
}