1use futures::StreamExt;
2
3use tonic::Status;
4
5use crate::input::select::Select;
6use crate::output::upsert::Upsert;
7
8pub async fn upsert_selected<I, O, S, U, C>(
9 ibucket: I,
10 obucket: O,
11 sel: &S,
12 ups: &U,
13 conv: &C,
14) -> Result<u64, Status>
15where
16 S: Select<Bucket = I>,
17 U: Upsert<Bucket = O>,
18 C: Fn(<S as Select>::Row) -> Result<<U as Upsert>::Row, Status>,
19{
20 let rows = sel.all(ibucket).await?;
21
22 let converted = rows.map(|r| r.and_then(conv));
23
24 ups.upsert(obucket, converted).await
25}