1use anyhow::anyhow;
5use cid::Cid;
6use co_api::{
7 co, BlockStorage, BlockStorageExt, CoList, CoListIndex, CoMap, CoTryStreamExt, CoreBlockStorage, IsDefault,
8 LazyTransaction, Link, OptionLink, Reducer, ReducerAction, Tags,
9};
10use futures::{pin_mut, FutureExt, TryStreamExt};
11use std::future::ready;
12
13pub type ListName = String;
14pub type TaskId = String;
15
16#[co]
18pub enum BoardAction {
19 BoardRename(String),
20 BoardTagsInsert(Tags),
21 BoardTagsRemove(Tags),
22 ListCreate { list: List, after: Option<ListName> },
23 ListArrange { name: ListName, after: Option<ListName> },
24 ListDelete { name: ListName, move_tasks_to_list: Option<ListName> },
25 ListTagsInsert(ListName, Tags),
26 ListTagsRemove(ListName, Tags),
27 TaskCreate { list: ListName, task: Task, after: Option<TaskId> },
30 TaskMove { from_list: Option<ListName>, list: ListName, task: TaskId, after: Option<TaskId>, lock: TaskLock },
31 TaskArrange { task: TaskId, after: Option<TaskId> },
32 TaskDelete(TaskId),
33 TaskRename(TaskId, String),
34 TaskPayloadChange(TaskId, Option<Cid>),
35 TaskTagsInsert(TaskId, Tags),
36 TaskTagsRemove(TaskId, Tags),
37}
38
39#[co(state)]
40pub struct Board {
41 #[serde(rename = "n", default, skip_serializing_if = "String::is_empty")]
43 pub name: String,
44
45 #[serde(rename = "l", default, skip_serializing_if = "CoList::is_empty")]
47 pub lists: CoList<List>,
48
49 #[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
51 pub tags: Tags,
52
53 #[serde(rename = "i", default, skip_serializing_if = "CoMap::is_empty")]
55 pub tasks: CoMap<TaskId, Task>,
56}
57impl Reducer<BoardAction> for Board {
58 async fn reduce(
59 state_link: OptionLink<Self>,
60 event_link: Link<ReducerAction<BoardAction>>,
61 storage: &CoreBlockStorage,
62 ) -> Result<Link<Self>, anyhow::Error> {
63 let event = storage.get_value(&event_link).await?;
64 let mut state = storage.get_value_or_default(&state_link).await?;
65 reduce(storage, &mut state, event.payload).await?;
66 Ok(storage.set_value(&state).await?)
67 }
68}
69
70#[co]
71pub struct List {
72 #[serde(rename = "n")]
74 pub name: ListName,
75
76 #[serde(rename = "i", default, skip_serializing_if = "CoList::is_empty")]
78 pub tasks: CoList<TaskId>,
79
80 #[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
82 pub tags: Tags,
83}
84impl List {
85 pub fn new(name: impl Into<ListName>) -> Self {
86 Self { name: name.into(), tags: Default::default(), tasks: Default::default() }
87 }
88}
89
90#[co]
91pub struct Task {
92 #[serde(rename = "u")]
94 pub id: TaskId,
95
96 #[serde(rename = "n")]
98 pub name: String,
99
100 #[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
102 pub tags: Tags,
103
104 #[serde(rename = "p", default, skip_serializing_if = "Option::is_none")]
106 pub payload: Option<Cid>,
107
108 #[serde(rename = "l", default, skip_serializing_if = "IsDefault::is_default")]
110 pub lock: Option<String>,
111}
112
113#[co]
114#[derive(Default)]
115pub enum TaskLock {
116 #[default]
119 None,
120
121 Force,
123
124 Lock(String),
127
128 Unlock(String),
131}
132
133async fn reduce<S>(storage: &S, state: &mut Board, action: BoardAction) -> Result<(), anyhow::Error>
134where
135 S: BlockStorage + Clone + 'static,
136{
137 let mut transaction = BoardTransaction {
139 storage: storage.clone(),
140 lists: LazyTransaction::new(storage.clone(), state.lists.clone()),
141 tasks: LazyTransaction::new(storage.clone(), state.tasks.clone()),
142 };
143
144 match action {
146 BoardAction::BoardRename(name) => reduce_board_rename(state, name).boxed().await?,
147 BoardAction::BoardTagsInsert(tags) => reduce_board_tags_insert(state, tags).boxed().await?,
148 BoardAction::BoardTagsRemove(tags) => reduce_board_tags_remove(state, tags).boxed().await?,
149 BoardAction::ListCreate { list, after } => reduce_list_create(&mut transaction, list, after).boxed().await?,
150 BoardAction::ListArrange { name, after } => reduce_list_arrange(&mut transaction, name, after).boxed().await?,
151 BoardAction::ListDelete { name, move_tasks_to_list } => {
152 reduce_list_delete(&mut transaction, name, move_tasks_to_list).boxed().await?
153 },
154 BoardAction::ListTagsInsert(name, tags) => {
155 reduce_list_tags_insert(&mut transaction, name, tags).boxed().await?
156 },
157 BoardAction::ListTagsRemove(name, tags) => {
158 reduce_list_tags_remove(&mut transaction, name, tags).boxed().await?
159 },
160 BoardAction::TaskCreate { list, task, after } => {
161 reduce_task_create(&mut transaction, list, task, after).boxed().await?
162 },
163 BoardAction::TaskMove { from_list, list, task, after, lock } => {
164 reduce_task_move(&mut transaction, from_list, list, task, after, lock)
165 .boxed()
166 .await?
167 },
168 BoardAction::TaskArrange { task, after } => reduce_task_arrange(&mut transaction, task, after).boxed().await?,
169 BoardAction::TaskDelete(task) => reduce_task_delete(&mut transaction, task).boxed().await?,
170 BoardAction::TaskRename(task, name) => reduce_task_rename(&mut transaction, task, name).boxed().await?,
171 BoardAction::TaskPayloadChange(task, cid) => {
172 reduce_task_payload_change(&mut transaction, task, cid).boxed().await?
173 },
174 BoardAction::TaskTagsInsert(task, tags) => {
175 reduce_task_tags_insert(&mut transaction, task, tags).boxed().await?
176 },
177 BoardAction::TaskTagsRemove(task, tags) => {
178 reduce_task_tags_remove(&mut transaction, task, tags).boxed().await?
179 },
180 }
181
182 if transaction.lists.is_mut_access() {
184 state.lists = transaction.lists.get_mut().await?.store().await?;
185 }
186 if transaction.tasks.is_mut_access() {
187 state.tasks = transaction.tasks.get_mut().await?.store().await?;
188 }
189
190 Ok(())
192}
193
194struct BoardTransaction<S>
195where
196 S: BlockStorage + Clone + 'static,
197{
198 storage: S,
199 lists: LazyTransaction<S, CoList<List>>,
200 tasks: LazyTransaction<S, CoMap<TaskId, Task>>,
201}
202impl<S> BoardTransaction<S>
203where
204 S: BlockStorage + Clone + 'static,
205{
206 async fn find_list_by_name(&mut self, name: &str) -> Result<Option<(CoListIndex, List)>, anyhow::Error> {
208 Ok(self
209 .lists
210 .get()
211 .await?
212 .stream()
213 .try_filter(|item| ready(item.1.name == name))
214 .try_first()
215 .await?)
216 }
217
218 async fn find_task_list(&mut self, id: &TaskId) -> Result<Option<(CoListIndex, List, CoListIndex)>, anyhow::Error> {
220 Ok(self
221 .lists
222 .get()
223 .await?
224 .stream()
225 .try_filter_map(|(index, list)| {
226 let storage = self.storage.clone();
227 async move {
228 Ok(list
229 .tasks
230 .stream(&storage)
231 .try_filter(|(_, task)| ready(task == id))
232 .try_first()
233 .await?
234 .map(|(task_index, _task_id)| (index, list, task_index)))
235 }
236 })
237 .try_first()
238 .await?)
239 }
240
241 async fn task(&mut self, task_id: &TaskId) -> Result<Task, anyhow::Error> {
243 self.tasks
244 .get()
245 .await?
246 .get(task_id)
247 .await?
248 .ok_or_else(|| anyhow!("Task not found: {}", task_id))
249 }
250}
251
252async fn reduce_task_tags_remove<S: BlockStorage + Clone + 'static>(
253 transaction: &mut BoardTransaction<S>,
254 task_id: TaskId,
255 tags: Tags,
256) -> Result<(), anyhow::Error> {
257 let mut task = transaction
258 .tasks
259 .get()
260 .await?
261 .get(&task_id)
262 .await?
263 .ok_or(anyhow!("Task not found: {}", task_id))?;
264
265 task.tags.clear(Some(&tags));
267
268 transaction.tasks.get_mut().await?.insert(task_id, task).await?;
270
271 Ok(())
272}
273
274async fn reduce_task_tags_insert<S: BlockStorage + Clone + 'static>(
275 transaction: &mut BoardTransaction<S>,
276 task_id: TaskId,
277 mut tags: Tags,
278) -> Result<(), anyhow::Error> {
279 let mut task = transaction
280 .tasks
281 .get()
282 .await?
283 .get(&task_id)
284 .await?
285 .ok_or(anyhow!("Task not found: {}", task_id))?;
286
287 task.tags.append(&mut tags);
289
290 transaction.tasks.get_mut().await?.insert(task_id, task).await?;
292
293 Ok(())
294}
295
296async fn reduce_task_payload_change<S: BlockStorage + Clone + 'static>(
297 transaction: &mut BoardTransaction<S>,
298 task_id: TaskId,
299 payload: Option<Cid>,
300) -> Result<(), anyhow::Error> {
301 let mut task = transaction
302 .tasks
303 .get()
304 .await?
305 .get(&task_id)
306 .await?
307 .ok_or(anyhow!("Task not found: {}", task_id))?;
308
309 if task.payload != payload {
311 task.payload = payload;
313
314 transaction.tasks.get_mut().await?.insert(task_id, task).await?;
316 }
317 Ok(())
318}
319
320async fn reduce_task_rename<S: BlockStorage + Clone + 'static>(
321 transaction: &mut BoardTransaction<S>,
322 task_id: TaskId,
323 name: String,
324) -> Result<(), anyhow::Error> {
325 let mut task = transaction
326 .tasks
327 .get()
328 .await?
329 .get(&task_id)
330 .await?
331 .ok_or(anyhow!("Task not found: {}", task_id))?;
332
333 if task.name != name {
335 task.name = name;
337
338 transaction.tasks.get_mut().await?.insert(task_id, task).await?;
340 }
341 Ok(())
342}
343
344async fn reduce_task_delete<S: BlockStorage + Clone + 'static>(
345 transaction: &mut BoardTransaction<S>,
346 task_id: TaskId,
347) -> Result<(), anyhow::Error> {
348 let (list_index, mut list, task_index) = transaction
350 .find_task_list(&task_id)
351 .await?
352 .ok_or(anyhow!("Task list not found: {}", task_id))?;
353
354 transaction
356 .tasks
357 .get_mut()
358 .await?
359 .remove(task_id.clone())
360 .await?
361 .ok_or(anyhow!("Task not found: {}", task_id))?;
362
363 list.tasks.remove(&transaction.storage, task_index).await?;
365
366 transaction.lists.get_mut().await?.set(list_index, list).await?;
368
369 Ok(())
370}
371
372async fn reduce_task_arrange<S: BlockStorage + Clone + 'static>(
373 transaction: &mut BoardTransaction<S>,
374 task_id: TaskId,
375 after: Option<TaskId>,
376) -> Result<(), anyhow::Error> {
377 let (list_index, mut list, task_index) = transaction
379 .find_task_list(&task_id)
380 .await?
381 .ok_or(anyhow!("Task list not found: {}", task_id))?;
382
383 let mut list_tasks = list.tasks.open(&transaction.storage).await?;
385 let task_after_index = if let Some(after) = &after {
386 list_tasks
387 .stream()
388 .try_filter(|item| ready(&item.1 == after))
389 .try_first()
390 .await?
391 .map(|(index, _)| index)
392 } else {
393 None
394 };
395
396 list_tasks.remove(task_index).await?;
398
399 if let Some(task_after_index) = task_after_index {
401 list_tasks.insert(task_after_index, task_id).await?;
402 } else {
403 list_tasks.push(task_id).await?;
404 }
405
406 list.tasks = list_tasks.store().await?;
408 transaction.lists.get_mut().await?.set(list_index, list).await?;
409
410 Ok(())
411}
412
413async fn reduce_task_move<S: BlockStorage + Clone + 'static>(
414 transaction: &mut BoardTransaction<S>,
415 from_list: Option<ListName>,
416 list_name: ListName,
417 task_id: TaskId,
418 after: Option<TaskId>,
419 lock: TaskLock,
420) -> Result<(), anyhow::Error> {
421 task_lock(transaction, &task_id, &lock).await?;
423
424 let (source_list_index, mut source_list, mut source_list_tasks, source_task_index) =
426 if let Some(from_list) = &from_list {
427 let (source_list_index, source_list) = transaction
428 .find_list_by_name(from_list)
429 .await?
430 .ok_or(anyhow!("List not found: {}", from_list))?;
431 let list_tasks = source_list.tasks.open(&transaction.storage).await?;
432 let source_task_index = list_tasks
433 .stream()
434 .try_filter(|item| ready(item.1 == task_id))
435 .try_first()
436 .await?
437 .map(|(index, _)| index)
438 .ok_or(anyhow!("Task not found: {} in list: {}", task_id, source_list.name))?;
439 (source_list_index, source_list, list_tasks, source_task_index)
440 } else {
441 let (source_list_index, source_list, source_task_index) = transaction
442 .find_task_list(&task_id)
443 .await?
444 .ok_or(anyhow!("Task list not found: {}", task_id))?;
445 let list_tasks = source_list.tasks.open(&transaction.storage).await?;
446 (source_list_index, source_list, list_tasks, source_task_index)
447 };
448
449 let (list_index, mut list) = transaction
451 .find_list_by_name(&list_name)
452 .await?
453 .ok_or(anyhow!("List not found: {}", list_name))?;
454 let mut list_tasks = list.tasks.open(&transaction.storage).await?;
455
456 let task_after_index = if let Some(after) = &after {
458 list_tasks
459 .stream()
460 .try_filter(|item| ready(&item.1 == after))
461 .try_first()
462 .await?
463 .map(|(index, _)| index)
464 } else {
465 None
466 };
467
468 source_list_tasks.remove(source_task_index).await?;
470
471 if let Some(task_after_index) = task_after_index {
473 list_tasks.insert(task_after_index, task_id.clone()).await?;
474 } else {
475 list_tasks.push(task_id.clone()).await?;
476 }
477
478 source_list.tasks = source_list_tasks.store().await?;
480 transaction.lists.get_mut().await?.set(source_list_index, source_list).await?;
481 list.tasks = list_tasks.store().await?;
482 transaction.lists.get_mut().await?.set(list_index, list).await?;
483
484 Ok(())
486}
487
488async fn reduce_task_create<S: BlockStorage + Clone + 'static>(
489 transaction: &mut BoardTransaction<S>,
490 list: ListName,
491 task: Task,
492 after: Option<TaskId>,
493) -> Result<(), anyhow::Error> {
494 let task_id = task.id.clone();
495
496 let (list_index, mut list) = transaction
498 .find_list_by_name(&list)
499 .await?
500 .ok_or(anyhow!("List not found: {}", list))?;
501
502 if transaction.tasks.get().await?.contains_key(&task_id).await? {
504 return Err(anyhow!("Task exists: {}", task_id));
505 }
506
507 transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
509
510 let mut list_tasks = list.tasks.open(&transaction.storage).await?;
512 let task_after_index = if let Some(after) = &after {
513 list_tasks
514 .stream()
515 .try_filter(|item| ready(&item.1 == after))
516 .try_first()
517 .await?
518 .map(|(index, _)| index)
519 } else {
520 None
521 };
522 if let Some(task_after_index) = task_after_index {
523 list_tasks.insert(task_after_index, task_id).await?;
524 } else {
525 list_tasks.push(task_id).await?;
526 }
527
528 list.tasks = list_tasks.store().await?;
530 transaction.lists.get_mut().await?.set(list_index, list).await?;
531
532 Ok(())
533}
534
535async fn reduce_list_tags_remove<S: BlockStorage + Clone + 'static>(
536 transaction: &mut BoardTransaction<S>,
537 name: String,
538 tags: Tags,
539) -> Result<(), anyhow::Error> {
540 let (list_index, mut list) = transaction
542 .find_list_by_name(&name)
543 .await?
544 .ok_or(anyhow!("List not found: {}", name))?;
545
546 list.tags.clear(Some(&tags));
548
549 transaction.lists.get_mut().await?.set(list_index, list).await?;
551 Ok(())
552}
553
554async fn reduce_list_tags_insert<S: BlockStorage + Clone + 'static>(
555 transaction: &mut BoardTransaction<S>,
556 name: String,
557 mut tags: Tags,
558) -> Result<(), anyhow::Error> {
559 let (list_index, mut list) = transaction
561 .find_list_by_name(&name)
562 .await?
563 .ok_or(anyhow!("List not found: {}", name))?;
564
565 list.tags.append(&mut tags);
567
568 transaction.lists.get_mut().await?.set(list_index, list).await?;
570 Ok(())
571}
572
573async fn reduce_list_delete<S: BlockStorage + Clone + 'static>(
574 transaction: &mut BoardTransaction<S>,
575 name: String,
576 move_tasks_to_list: Option<String>,
577) -> Result<(), anyhow::Error> {
578 let (list_index, list) = transaction
580 .find_list_by_name(&name)
581 .await?
582 .ok_or(anyhow!("List not found: {}", name))?;
583
584 if let Some(move_tasks_to_list) = &move_tasks_to_list {
586 let (_to_list_index, to_list) = transaction
587 .find_list_by_name(move_tasks_to_list)
588 .await?
589 .ok_or(anyhow!("List not found: {}", name))?;
590 if !list.tasks.is_empty() {
591 let storage = transaction.storage.clone();
592 let tasks = list.tasks.clone();
593 let tasks = tasks.stream(&storage);
594 pin_mut!(tasks);
595 while let Some((_, task)) = tasks.try_next().await? {
596 reduce_task_move(transaction, None, to_list.name.clone(), task, None, TaskLock::Force).await?;
597 }
598 }
599 }
600
601 transaction.lists.get_mut().await?.remove(list_index).await?;
603
604 Ok(())
605}
606
607async fn reduce_list_arrange<S: BlockStorage + Clone + 'static>(
608 transaction: &mut BoardTransaction<S>,
609 name: String,
610 after: Option<String>,
611) -> Result<(), anyhow::Error> {
612 let (list_index, list) = transaction
614 .find_list_by_name(&name)
615 .await?
616 .ok_or(anyhow!("List not found: {}", name))?;
617
618 let after_index = if let Some(after) = &after {
620 transaction.find_list_by_name(after).await?.map(|(index, _)| index)
621 } else {
622 None
623 };
624
625 transaction.lists.get_mut().await?.remove(list_index).await?;
627
628 if let Some(after_index) = after_index {
630 transaction.lists.get_mut().await?.insert(after_index, list).await?;
631 } else {
632 transaction.lists.get_mut().await?.push(list).await?;
633 }
634 Ok(())
635}
636
637async fn reduce_list_create<S: BlockStorage + Clone + 'static>(
638 transaction: &mut BoardTransaction<S>,
639 list: List,
640 after: Option<String>,
641) -> Result<(), anyhow::Error> {
642 if transaction.find_list_by_name(&list.name).await?.is_some() {
644 return Err(anyhow!("List already exists: {}", list.name));
645 }
646
647 let after_index = if let Some(after) = &after {
649 transaction.find_list_by_name(after).await?.map(|(index, _)| index)
650 } else {
651 None
652 };
653
654 if let Some(after_index) = after_index {
656 transaction.lists.get_mut().await?.insert(after_index, list).await?;
657 } else {
658 transaction.lists.get_mut().await?.push(list).await?;
659 }
660 Ok(())
661}
662
663async fn reduce_board_tags_remove(state: &mut Board, tags: Tags) -> Result<(), anyhow::Error> {
664 state.tags.clear(Some(&tags));
665 Ok(())
666}
667
668async fn reduce_board_tags_insert(state: &mut Board, mut tags: Tags) -> Result<(), anyhow::Error> {
669 state.tags.append(&mut tags);
670 Ok(())
671}
672
673async fn reduce_board_rename(state: &mut Board, name: String) -> Result<(), anyhow::Error> {
674 state.name = name;
675 Ok(())
676}
677
678async fn task_lock<S: BlockStorage + Clone + 'static>(
679 transaction: &mut BoardTransaction<S>,
680 task_id: &TaskId,
681 lock: &TaskLock,
682) -> Result<(), anyhow::Error> {
683 match lock {
684 TaskLock::None => {
685 let task = transaction.task(task_id).await?;
686 if task.lock.is_some() {
687 Err(anyhow!("Task locked"))
688 } else {
689 Ok(())
690 }
691 },
692 TaskLock::Force => Ok(()),
693 TaskLock::Lock(lock) => {
694 let mut task = transaction.task(task_id).await?;
695 match task.lock {
696 Some(task_lock) if lock == &task_lock => Ok(()),
697 Some(_task_lock) => Err(anyhow!("Task locked")),
698 None => {
699 task.lock = Some(lock.clone());
700 transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
701 Ok(())
702 },
703 }
704 },
705 TaskLock::Unlock(lock) => {
706 let mut task = transaction.task(task_id).await?;
707 match task.lock {
708 Some(task_lock) if lock == &task_lock => {
709 task.lock = None;
710 transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
711 Ok(())
712 },
713 Some(_task_lock) => Err(anyhow!("Task locked")),
714 None => Ok(()),
715 }
716 },
717 }
718}