use std::{sync::Arc, thread};
use futures_util::{Future, future::BoxFuture};
use tokio::sync::mpsc;
use crate::{Result, blocking::BlockingError};
const THREAD_NAME: &str = "longbridge-sync-runtime";
type ExecFn<Ctx> = Box<dyn FnOnce(Arc<Ctx>) -> BoxFuture<'static, ()> + Send + 'static>;
pub(crate) struct BlockingRuntime<Ctx> {
task_tx: flume::Sender<ExecFn<Ctx>>,
}
impl<Ctx> BlockingRuntime<Ctx>
where
Ctx: Send + Sync + 'static,
{
pub(crate) fn try_new<CreateCtx, PushType, PushCallback>(
create_ctx: CreateCtx,
mut push_callback: PushCallback,
) -> Result<Self>
where
CreateCtx: FnOnce() -> Result<(Ctx, mpsc::UnboundedReceiver<PushType>)> + Send + 'static,
PushCallback: FnMut(PushType) + Send + 'static,
PushType: Send + 'static,
{
let (init_tx, init_rx) = flume::unbounded();
let (task_tx, task_rx) = flume::unbounded::<ExecFn<Ctx>>();
let (push_tx, push_rx) = std::sync::mpsc::channel::<PushType>();
thread::Builder::new()
.spawn(move || {
while let Ok(event) = push_rx.recv() {
push_callback(event);
}
})
.expect("spawn thread");
thread::Builder::new()
.name(THREAD_NAME.to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("create tokio runtime");
let handle = rt.handle().clone();
rt.block_on(async move {
let (ctx, mut event_rx) = match create_ctx() {
Ok(res) => {
let _ = init_tx.send(Ok(()));
res
}
Err(err) => {
tracing::error!(error = %err, "failed to create quote context");
let _ = init_tx.send(Err(err));
return;
}
};
let ctx = Arc::new(ctx);
loop {
tokio::select! {
item = task_rx.recv_async() => {
match item {
Ok(f) => {
handle.spawn(f(ctx.clone()));
},
Err(_) => break,
}
}
item = event_rx.recv() => {
match item {
Some(event) => _ = push_tx.send(event),
None => break,
}
}
}
}
});
})
.expect("spawn thread");
init_rx
.recv()
.expect("recv init event")
.map(|_| Self { task_tx })
}
pub(crate) fn call<F, Fut, R>(&self, f: F) -> Result<R>
where
F: FnOnce(Arc<Ctx>) -> Fut + Send + 'static,
Fut: Future<Output = Result<R>> + Send,
R: Send + 'static,
{
let (reply_tx, reply_rx) = flume::unbounded();
self.task_tx
.send(Box::new(move |ctx| {
Box::pin(async move {
let res = f(ctx).await;
let _ = reply_tx.send(res);
})
}))
.map_err(|_| BlockingError::Closed)?;
reply_rx.recv().map_err(|_| BlockingError::Closed)?
}
}