use futures::{Stream, StreamExt};
pub(crate) async fn consume_with_coalesced_writes<C, F>(
stream: impl Stream<Item = Result<C, crate::error::Error>>,
log_writer: objectiveai_sdk::filesystem::logs::LogWriter<C>,
push: F,
handle: objectiveai_cli_sdk::output::Handle,
) -> Result<C, crate::error::Error>
where
C: Clone + Send + Sync + 'static,
F: Fn(&mut C, &C) + Clone + Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<C>();
let writer_push = push.clone();
let writer_handle = tokio::spawn(async move {
writer_loop(rx, log_writer, writer_push, handle).await
});
let mut main_agg: Option<C> = None;
let mut stream_err: Option<crate::error::Error> = None;
futures::pin_mut!(stream);
while let Some(item) = stream.next().await {
match item {
Ok(chunk) => {
match &mut main_agg {
Some(a) => push(a, &chunk),
None => main_agg = Some(chunk.clone()),
}
let _ = tx.send(chunk);
}
Err(e) => {
stream_err = Some(e);
break;
}
}
}
drop(tx);
let writer_result = writer_handle
.await
.map_err(|_| crate::error::Error::WriterPanic)?;
writer_result?;
if let Some(e) = stream_err {
return Err(e);
}
main_agg.ok_or(crate::error::Error::EmptyStream)
}
async fn writer_loop<C, F>(
mut rx: tokio::sync::mpsc::UnboundedReceiver<C>,
mut log_writer: objectiveai_sdk::filesystem::logs::LogWriter<C>,
push: F,
handle: objectiveai_cli_sdk::output::Handle,
) -> Result<(), crate::error::Error>
where
F: Fn(&mut C, &C),
{
let mut agg: Option<C> = None;
let mut logged_id = false;
while let Some(first) = rx.recv().await {
match &mut agg {
Some(a) => push(a, &first),
None => agg = Some(first),
}
while let Ok(next) = rx.try_recv() {
if let Some(a) = &mut agg {
push(a, &next);
}
}
if let Some(a) = &agg {
log_writer.write(a).await?;
}
if !logged_id {
if let Some(id) = log_writer.primary_id() {
crate::log_line::emit_log_stream_ready(id, &handle).await;
logged_id = true;
}
}
}
Ok(())
}