1use crate::{
5 library::{
6 data::{read_input_sync, write_output_sync},
7 wasm_storage::WasmStorage,
8 },
9 Reducer,
10};
11use co_primitives::{CoreBlockStorage, RawCid, ReducerInput, ReducerOutput, Tags};
12use futures::{executor::LocalPool, future::LocalBoxFuture, task::LocalSpawnExt, FutureExt};
13use serde::de::DeserializeOwned;
14use std::sync::Arc;
15
16#[allow(clippy::type_complexity)]
17pub struct ReducerRef(
18 Arc<dyn Fn(ReducerInput, CoreBlockStorage) -> LocalBoxFuture<'static, ReducerOutput> + Sync + Send + 'static>,
19);
20impl ReducerRef {
21 pub fn new<R, A>() -> Self
22 where
23 R: Reducer<A> + 'static,
24 A: Clone + DeserializeOwned + 'static,
25 {
26 Self(Arc::new(|input, storage| {
27 async move {
28 let state = input.state;
29 match R::reduce(state.into(), input.action.into(), &storage).await {
30 Ok(link) => ReducerOutput { state: Some(link.into()), error: None, tags: Tags::default() },
31 Err(err) => ReducerOutput { state, error: Some(err.to_string()), tags: Tags::default() },
32 }
33 }
34 .boxed_local()
35 }))
36 }
37
38 pub fn execute_blocking(&self, input: ReducerInput, storage: CoreBlockStorage) -> ReducerOutput {
39 let closure = self.0.clone();
40 let mut pool = LocalPool::new();
41 let handle = pool
42 .spawner()
43 .spawn_local_with_handle(async move { closure(input, storage).await })
44 .expect("future to execute");
45 pool.run_until(handle)
46 }
47
48 pub async fn execute_async(&self, input: ReducerInput, storage: CoreBlockStorage) -> ReducerOutput {
49 (self.0)(input, storage).await
50 }
51}
52impl Clone for ReducerRef {
53 fn clone(&self) -> Self {
54 Self(self.0.clone())
55 }
56}
57
58pub fn reduce<R, A>(input: &RawCid, output: &mut RawCid)
59where
60 R: Reducer<A> + 'static,
61 A: Clone + DeserializeOwned + 'static,
62{
63 let mut storage = WasmStorage::new();
64 let block_storage = CoreBlockStorage::new(storage.clone(), false);
65
66 let reducer_input: ReducerInput = read_input_sync(&storage, input);
68
69 let reducer_output = ReducerRef::new::<R, A>().execute_blocking(reducer_input, block_storage);
71
72 write_output_sync(&mut storage, &reducer_output, output);
74}