Skip to main content

co_api/library/
reduce.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use crate::{
5	library::{
6		data::{read_input_sync, write_output_sync},
7		wasm_storage::WasmStorage,
8	},
9	Reducer,
10};
11use co_primitives::{CoreBlockStorage, RawCid, ReducerInput, ReducerOutput, Tags};
12use futures::{executor::LocalPool, future::LocalBoxFuture, task::LocalSpawnExt, FutureExt};
13use serde::de::DeserializeOwned;
14use std::sync::Arc;
15
16#[allow(clippy::type_complexity)]
17pub struct ReducerRef(
18	Arc<dyn Fn(ReducerInput, CoreBlockStorage) -> LocalBoxFuture<'static, ReducerOutput> + Sync + Send + 'static>,
19);
20impl ReducerRef {
21	pub fn new<R, A>() -> Self
22	where
23		R: Reducer<A> + 'static,
24		A: Clone + DeserializeOwned + 'static,
25	{
26		Self(Arc::new(|input, storage| {
27			async move {
28				let state = input.state;
29				match R::reduce(state.into(), input.action.into(), &storage).await {
30					Ok(link) => ReducerOutput { state: Some(link.into()), error: None, tags: Tags::default() },
31					Err(err) => ReducerOutput { state, error: Some(err.to_string()), tags: Tags::default() },
32				}
33			}
34			.boxed_local()
35		}))
36	}
37
38	pub fn execute_blocking(&self, input: ReducerInput, storage: CoreBlockStorage) -> ReducerOutput {
39		let closure = self.0.clone();
40		let mut pool = LocalPool::new();
41		let handle = pool
42			.spawner()
43			.spawn_local_with_handle(async move { closure(input, storage).await })
44			.expect("future to execute");
45		pool.run_until(handle)
46	}
47
48	pub async fn execute_async(&self, input: ReducerInput, storage: CoreBlockStorage) -> ReducerOutput {
49		(self.0)(input, storage).await
50	}
51}
52impl Clone for ReducerRef {
53	fn clone(&self) -> Self {
54		Self(self.0.clone())
55	}
56}
57
58pub fn reduce<R, A>(input: &RawCid, output: &mut RawCid)
59where
60	R: Reducer<A> + 'static,
61	A: Clone + DeserializeOwned + 'static,
62{
63	let mut storage = WasmStorage::new();
64	let block_storage = CoreBlockStorage::new(storage.clone(), false);
65
66	// input
67	let reducer_input: ReducerInput = read_input_sync(&storage, input);
68
69	// reduce
70	let reducer_output = ReducerRef::new::<R, A>().execute_blocking(reducer_input, block_storage);
71
72	// output
73	write_output_sync(&mut storage, &reducer_output, output);
74}