Skip to main content

co_core_board/
lib.rs

1// SPDX-License-Identifier: AGPL-3.0-only
2// Copyright (C) 2026 1io BRANDGUARDIAN GmbH
3
4use anyhow::anyhow;
5use cid::Cid;
6use co_api::{
7	co, BlockStorage, BlockStorageExt, CoList, CoListIndex, CoMap, CoTryStreamExt, CoreBlockStorage, IsDefault,
8	LazyTransaction, Link, OptionLink, Reducer, ReducerAction, Tags,
9};
10use futures::{pin_mut, FutureExt, TryStreamExt};
11use std::future::ready;
12
13pub type ListName = String;
14pub type TaskId = String;
15
16/// Board actions.
17#[co]
18pub enum BoardAction {
19	BoardRename(String),
20	BoardTagsInsert(Tags),
21	BoardTagsRemove(Tags),
22	ListCreate { list: List, after: Option<ListName> },
23	ListArrange { name: ListName, after: Option<ListName> },
24	ListDelete { name: ListName, move_tasks_to_list: Option<ListName> },
25	ListTagsInsert(ListName, Tags),
26	ListTagsRemove(ListName, Tags),
27	// ListTasksDelete(ListName),
28	// ListTasksMove { from: ListName, to: ListName },
29	TaskCreate { list: ListName, task: Task, after: Option<TaskId> },
30	TaskMove { from_list: Option<ListName>, list: ListName, task: TaskId, after: Option<TaskId>, lock: TaskLock },
31	TaskArrange { task: TaskId, after: Option<TaskId> },
32	TaskDelete(TaskId),
33	TaskRename(TaskId, String),
34	TaskPayloadChange(TaskId, Option<Cid>),
35	TaskTagsInsert(TaskId, Tags),
36	TaskTagsRemove(TaskId, Tags),
37}
38
39#[co(state)]
40pub struct Board {
41	/// Board name.
42	#[serde(rename = "n", default, skip_serializing_if = "String::is_empty")]
43	pub name: String,
44
45	/// Board lists.
46	#[serde(rename = "l", default, skip_serializing_if = "CoList::is_empty")]
47	pub lists: CoList<List>,
48
49	/// Board tags.
50	#[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
51	pub tags: Tags,
52
53	/// Board tasks.
54	#[serde(rename = "i", default, skip_serializing_if = "CoMap::is_empty")]
55	pub tasks: CoMap<TaskId, Task>,
56}
57impl Reducer<BoardAction> for Board {
58	async fn reduce(
59		state_link: OptionLink<Self>,
60		event_link: Link<ReducerAction<BoardAction>>,
61		storage: &CoreBlockStorage,
62	) -> Result<Link<Self>, anyhow::Error> {
63		let event = storage.get_value(&event_link).await?;
64		let mut state = storage.get_value_or_default(&state_link).await?;
65		reduce(storage, &mut state, event.payload).await?;
66		Ok(storage.set_value(&state).await?)
67	}
68}
69
70#[co]
71pub struct List {
72	/// List name.
73	#[serde(rename = "n")]
74	pub name: ListName,
75
76	/// List tasks.
77	#[serde(rename = "i", default, skip_serializing_if = "CoList::is_empty")]
78	pub tasks: CoList<TaskId>,
79
80	/// List tags.
81	#[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
82	pub tags: Tags,
83}
84impl List {
85	pub fn new(name: impl Into<ListName>) -> Self {
86		Self { name: name.into(), tags: Default::default(), tasks: Default::default() }
87	}
88}
89
90#[co]
91pub struct Task {
92	/// Task unique id.
93	#[serde(rename = "u")]
94	pub id: TaskId,
95
96	/// Task name.
97	#[serde(rename = "n")]
98	pub name: String,
99
100	/// Task tags.
101	#[serde(rename = "t", default, skip_serializing_if = "Tags::is_empty")]
102	pub tags: Tags,
103
104	/// Task payload.
105	#[serde(rename = "p", default, skip_serializing_if = "Option::is_none")]
106	pub payload: Option<Cid>,
107
108	/// Task exclusive lock identifier.
109	#[serde(rename = "l", default, skip_serializing_if = "IsDefault::is_default")]
110	pub lock: Option<String>,
111}
112
113#[co]
114#[derive(Default)]
115pub enum TaskLock {
116	/// No lock.
117	/// Fail the operation if the subject is locked.
118	#[default]
119	None,
120
121	/// Force operation if the subject is locked.
122	Force,
123
124	/// Use or apply a lock.
125	/// Fail the operation if the subject is locked with a different lock.
126	Lock(String),
127
128	/// Use and unlock after the operation.
129	/// Fail the operation if the subject is locked with a different lock.
130	Unlock(String),
131}
132
133async fn reduce<S>(storage: &S, state: &mut Board, action: BoardAction) -> Result<(), anyhow::Error>
134where
135	S: BlockStorage + Clone + 'static,
136{
137	// open
138	let mut transaction = BoardTransaction {
139		storage: storage.clone(),
140		lists: LazyTransaction::new(storage.clone(), state.lists.clone()),
141		tasks: LazyTransaction::new(storage.clone(), state.tasks.clone()),
142	};
143
144	// reduce
145	match action {
146		BoardAction::BoardRename(name) => reduce_board_rename(state, name).boxed().await?,
147		BoardAction::BoardTagsInsert(tags) => reduce_board_tags_insert(state, tags).boxed().await?,
148		BoardAction::BoardTagsRemove(tags) => reduce_board_tags_remove(state, tags).boxed().await?,
149		BoardAction::ListCreate { list, after } => reduce_list_create(&mut transaction, list, after).boxed().await?,
150		BoardAction::ListArrange { name, after } => reduce_list_arrange(&mut transaction, name, after).boxed().await?,
151		BoardAction::ListDelete { name, move_tasks_to_list } => {
152			reduce_list_delete(&mut transaction, name, move_tasks_to_list).boxed().await?
153		},
154		BoardAction::ListTagsInsert(name, tags) => {
155			reduce_list_tags_insert(&mut transaction, name, tags).boxed().await?
156		},
157		BoardAction::ListTagsRemove(name, tags) => {
158			reduce_list_tags_remove(&mut transaction, name, tags).boxed().await?
159		},
160		BoardAction::TaskCreate { list, task, after } => {
161			reduce_task_create(&mut transaction, list, task, after).boxed().await?
162		},
163		BoardAction::TaskMove { from_list, list, task, after, lock } => {
164			reduce_task_move(&mut transaction, from_list, list, task, after, lock)
165				.boxed()
166				.await?
167		},
168		BoardAction::TaskArrange { task, after } => reduce_task_arrange(&mut transaction, task, after).boxed().await?,
169		BoardAction::TaskDelete(task) => reduce_task_delete(&mut transaction, task).boxed().await?,
170		BoardAction::TaskRename(task, name) => reduce_task_rename(&mut transaction, task, name).boxed().await?,
171		BoardAction::TaskPayloadChange(task, cid) => {
172			reduce_task_payload_change(&mut transaction, task, cid).boxed().await?
173		},
174		BoardAction::TaskTagsInsert(task, tags) => {
175			reduce_task_tags_insert(&mut transaction, task, tags).boxed().await?
176		},
177		BoardAction::TaskTagsRemove(task, tags) => {
178			reduce_task_tags_remove(&mut transaction, task, tags).boxed().await?
179		},
180	}
181
182	// store
183	if transaction.lists.is_mut_access() {
184		state.lists = transaction.lists.get_mut().await?.store().await?;
185	}
186	if transaction.tasks.is_mut_access() {
187		state.tasks = transaction.tasks.get_mut().await?.store().await?;
188	}
189
190	// result
191	Ok(())
192}
193
194struct BoardTransaction<S>
195where
196	S: BlockStorage + Clone + 'static,
197{
198	storage: S,
199	lists: LazyTransaction<S, CoList<List>>,
200	tasks: LazyTransaction<S, CoMap<TaskId, Task>>,
201}
202impl<S> BoardTransaction<S>
203where
204	S: BlockStorage + Clone + 'static,
205{
206	/// Find list by name by scanning lists.
207	async fn find_list_by_name(&mut self, name: &str) -> Result<Option<(CoListIndex, List)>, anyhow::Error> {
208		Ok(self
209			.lists
210			.get()
211			.await?
212			.stream()
213			.try_filter(|item| ready(item.1.name == name))
214			.try_first()
215			.await?)
216	}
217
218	/// Fint task's list by scanning all lists.
219	async fn find_task_list(&mut self, id: &TaskId) -> Result<Option<(CoListIndex, List, CoListIndex)>, anyhow::Error> {
220		Ok(self
221			.lists
222			.get()
223			.await?
224			.stream()
225			.try_filter_map(|(index, list)| {
226				let storage = self.storage.clone();
227				async move {
228					Ok(list
229						.tasks
230						.stream(&storage)
231						.try_filter(|(_, task)| ready(task == id))
232						.try_first()
233						.await?
234						.map(|(task_index, _task_id)| (index, list, task_index)))
235				}
236			})
237			.try_first()
238			.await?)
239	}
240
241	/// Get task by id.
242	async fn task(&mut self, task_id: &TaskId) -> Result<Task, anyhow::Error> {
243		self.tasks
244			.get()
245			.await?
246			.get(task_id)
247			.await?
248			.ok_or_else(|| anyhow!("Task not found: {}", task_id))
249	}
250}
251
252async fn reduce_task_tags_remove<S: BlockStorage + Clone + 'static>(
253	transaction: &mut BoardTransaction<S>,
254	task_id: TaskId,
255	tags: Tags,
256) -> Result<(), anyhow::Error> {
257	let mut task = transaction
258		.tasks
259		.get()
260		.await?
261		.get(&task_id)
262		.await?
263		.ok_or(anyhow!("Task not found: {}", task_id))?;
264
265	// apply
266	task.tags.clear(Some(&tags));
267
268	// store
269	transaction.tasks.get_mut().await?.insert(task_id, task).await?;
270
271	Ok(())
272}
273
274async fn reduce_task_tags_insert<S: BlockStorage + Clone + 'static>(
275	transaction: &mut BoardTransaction<S>,
276	task_id: TaskId,
277	mut tags: Tags,
278) -> Result<(), anyhow::Error> {
279	let mut task = transaction
280		.tasks
281		.get()
282		.await?
283		.get(&task_id)
284		.await?
285		.ok_or(anyhow!("Task not found: {}", task_id))?;
286
287	// apply
288	task.tags.append(&mut tags);
289
290	// store
291	transaction.tasks.get_mut().await?.insert(task_id, task).await?;
292
293	Ok(())
294}
295
296async fn reduce_task_payload_change<S: BlockStorage + Clone + 'static>(
297	transaction: &mut BoardTransaction<S>,
298	task_id: TaskId,
299	payload: Option<Cid>,
300) -> Result<(), anyhow::Error> {
301	let mut task = transaction
302		.tasks
303		.get()
304		.await?
305		.get(&task_id)
306		.await?
307		.ok_or(anyhow!("Task not found: {}", task_id))?;
308
309	// apply
310	if task.payload != payload {
311		// set
312		task.payload = payload;
313
314		// store
315		transaction.tasks.get_mut().await?.insert(task_id, task).await?;
316	}
317	Ok(())
318}
319
320async fn reduce_task_rename<S: BlockStorage + Clone + 'static>(
321	transaction: &mut BoardTransaction<S>,
322	task_id: TaskId,
323	name: String,
324) -> Result<(), anyhow::Error> {
325	let mut task = transaction
326		.tasks
327		.get()
328		.await?
329		.get(&task_id)
330		.await?
331		.ok_or(anyhow!("Task not found: {}", task_id))?;
332
333	// apply
334	if task.name != name {
335		// set
336		task.name = name;
337
338		// store
339		transaction.tasks.get_mut().await?.insert(task_id, task).await?;
340	}
341	Ok(())
342}
343
344async fn reduce_task_delete<S: BlockStorage + Clone + 'static>(
345	transaction: &mut BoardTransaction<S>,
346	task_id: TaskId,
347) -> Result<(), anyhow::Error> {
348	// find task list
349	let (list_index, mut list, task_index) = transaction
350		.find_task_list(&task_id)
351		.await?
352		.ok_or(anyhow!("Task list not found: {}", task_id))?;
353
354	// remove
355	transaction
356		.tasks
357		.get_mut()
358		.await?
359		.remove(task_id.clone())
360		.await?
361		.ok_or(anyhow!("Task not found: {}", task_id))?;
362
363	// remove from list
364	list.tasks.remove(&transaction.storage, task_index).await?;
365
366	// store list
367	transaction.lists.get_mut().await?.set(list_index, list).await?;
368
369	Ok(())
370}
371
372async fn reduce_task_arrange<S: BlockStorage + Clone + 'static>(
373	transaction: &mut BoardTransaction<S>,
374	task_id: TaskId,
375	after: Option<TaskId>,
376) -> Result<(), anyhow::Error> {
377	// find task list
378	let (list_index, mut list, task_index) = transaction
379		.find_task_list(&task_id)
380		.await?
381		.ok_or(anyhow!("Task list not found: {}", task_id))?;
382
383	// after index
384	let mut list_tasks = list.tasks.open(&transaction.storage).await?;
385	let task_after_index = if let Some(after) = &after {
386		list_tasks
387			.stream()
388			.try_filter(|item| ready(&item.1 == after))
389			.try_first()
390			.await?
391			.map(|(index, _)| index)
392	} else {
393		None
394	};
395
396	// remove
397	list_tasks.remove(task_index).await?;
398
399	// insert
400	if let Some(task_after_index) = task_after_index {
401		list_tasks.insert(task_after_index, task_id).await?;
402	} else {
403		list_tasks.push(task_id).await?;
404	}
405
406	// store list
407	list.tasks = list_tasks.store().await?;
408	transaction.lists.get_mut().await?.set(list_index, list).await?;
409
410	Ok(())
411}
412
413async fn reduce_task_move<S: BlockStorage + Clone + 'static>(
414	transaction: &mut BoardTransaction<S>,
415	from_list: Option<ListName>,
416	list_name: ListName,
417	task_id: TaskId,
418	after: Option<TaskId>,
419	lock: TaskLock,
420) -> Result<(), anyhow::Error> {
421	// lock
422	task_lock(transaction, &task_id, &lock).await?;
423
424	// find source list and source list task index
425	let (source_list_index, mut source_list, mut source_list_tasks, source_task_index) =
426		if let Some(from_list) = &from_list {
427			let (source_list_index, source_list) = transaction
428				.find_list_by_name(from_list)
429				.await?
430				.ok_or(anyhow!("List not found: {}", from_list))?;
431			let list_tasks = source_list.tasks.open(&transaction.storage).await?;
432			let source_task_index = list_tasks
433				.stream()
434				.try_filter(|item| ready(item.1 == task_id))
435				.try_first()
436				.await?
437				.map(|(index, _)| index)
438				.ok_or(anyhow!("Task not found: {} in list: {}", task_id, source_list.name))?;
439			(source_list_index, source_list, list_tasks, source_task_index)
440		} else {
441			let (source_list_index, source_list, source_task_index) = transaction
442				.find_task_list(&task_id)
443				.await?
444				.ok_or(anyhow!("Task list not found: {}", task_id))?;
445			let list_tasks = source_list.tasks.open(&transaction.storage).await?;
446			(source_list_index, source_list, list_tasks, source_task_index)
447		};
448
449	// find target list
450	let (list_index, mut list) = transaction
451		.find_list_by_name(&list_name)
452		.await?
453		.ok_or(anyhow!("List not found: {}", list_name))?;
454	let mut list_tasks = list.tasks.open(&transaction.storage).await?;
455
456	// find target list index
457	let task_after_index = if let Some(after) = &after {
458		list_tasks
459			.stream()
460			.try_filter(|item| ready(&item.1 == after))
461			.try_first()
462			.await?
463			.map(|(index, _)| index)
464	} else {
465		None
466	};
467
468	// remove task from source list
469	source_list_tasks.remove(source_task_index).await?;
470
471	// add task to target list
472	if let Some(task_after_index) = task_after_index {
473		list_tasks.insert(task_after_index, task_id.clone()).await?;
474	} else {
475		list_tasks.push(task_id.clone()).await?;
476	}
477
478	// store
479	source_list.tasks = source_list_tasks.store().await?;
480	transaction.lists.get_mut().await?.set(source_list_index, source_list).await?;
481	list.tasks = list_tasks.store().await?;
482	transaction.lists.get_mut().await?.set(list_index, list).await?;
483
484	// result
485	Ok(())
486}
487
488async fn reduce_task_create<S: BlockStorage + Clone + 'static>(
489	transaction: &mut BoardTransaction<S>,
490	list: ListName,
491	task: Task,
492	after: Option<TaskId>,
493) -> Result<(), anyhow::Error> {
494	let task_id = task.id.clone();
495
496	// find list
497	let (list_index, mut list) = transaction
498		.find_list_by_name(&list)
499		.await?
500		.ok_or(anyhow!("List not found: {}", list))?;
501
502	// validate id is unique
503	if transaction.tasks.get().await?.contains_key(&task_id).await? {
504		return Err(anyhow!("Task exists: {}", task_id));
505	}
506
507	// create task
508	transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
509
510	// add to list
511	let mut list_tasks = list.tasks.open(&transaction.storage).await?;
512	let task_after_index = if let Some(after) = &after {
513		list_tasks
514			.stream()
515			.try_filter(|item| ready(&item.1 == after))
516			.try_first()
517			.await?
518			.map(|(index, _)| index)
519	} else {
520		None
521	};
522	if let Some(task_after_index) = task_after_index {
523		list_tasks.insert(task_after_index, task_id).await?;
524	} else {
525		list_tasks.push(task_id).await?;
526	}
527
528	// store list
529	list.tasks = list_tasks.store().await?;
530	transaction.lists.get_mut().await?.set(list_index, list).await?;
531
532	Ok(())
533}
534
535async fn reduce_list_tags_remove<S: BlockStorage + Clone + 'static>(
536	transaction: &mut BoardTransaction<S>,
537	name: String,
538	tags: Tags,
539) -> Result<(), anyhow::Error> {
540	// find
541	let (list_index, mut list) = transaction
542		.find_list_by_name(&name)
543		.await?
544		.ok_or(anyhow!("List not found: {}", name))?;
545
546	// insert
547	list.tags.clear(Some(&tags));
548
549	// store
550	transaction.lists.get_mut().await?.set(list_index, list).await?;
551	Ok(())
552}
553
554async fn reduce_list_tags_insert<S: BlockStorage + Clone + 'static>(
555	transaction: &mut BoardTransaction<S>,
556	name: String,
557	mut tags: Tags,
558) -> Result<(), anyhow::Error> {
559	// find
560	let (list_index, mut list) = transaction
561		.find_list_by_name(&name)
562		.await?
563		.ok_or(anyhow!("List not found: {}", name))?;
564
565	// insert
566	list.tags.append(&mut tags);
567
568	// store
569	transaction.lists.get_mut().await?.set(list_index, list).await?;
570	Ok(())
571}
572
573async fn reduce_list_delete<S: BlockStorage + Clone + 'static>(
574	transaction: &mut BoardTransaction<S>,
575	name: String,
576	move_tasks_to_list: Option<String>,
577) -> Result<(), anyhow::Error> {
578	// find
579	let (list_index, list) = transaction
580		.find_list_by_name(&name)
581		.await?
582		.ok_or(anyhow!("List not found: {}", name))?;
583
584	// move tasks
585	if let Some(move_tasks_to_list) = &move_tasks_to_list {
586		let (_to_list_index, to_list) = transaction
587			.find_list_by_name(move_tasks_to_list)
588			.await?
589			.ok_or(anyhow!("List not found: {}", name))?;
590		if !list.tasks.is_empty() {
591			let storage = transaction.storage.clone();
592			let tasks = list.tasks.clone();
593			let tasks = tasks.stream(&storage);
594			pin_mut!(tasks);
595			while let Some((_, task)) = tasks.try_next().await? {
596				reduce_task_move(transaction, None, to_list.name.clone(), task, None, TaskLock::Force).await?;
597			}
598		}
599	}
600
601	// delete list
602	transaction.lists.get_mut().await?.remove(list_index).await?;
603
604	Ok(())
605}
606
607async fn reduce_list_arrange<S: BlockStorage + Clone + 'static>(
608	transaction: &mut BoardTransaction<S>,
609	name: String,
610	after: Option<String>,
611) -> Result<(), anyhow::Error> {
612	// find
613	let (list_index, list) = transaction
614		.find_list_by_name(&name)
615		.await?
616		.ok_or(anyhow!("List not found: {}", name))?;
617
618	// find after
619	let after_index = if let Some(after) = &after {
620		transaction.find_list_by_name(after).await?.map(|(index, _)| index)
621	} else {
622		None
623	};
624
625	// remove
626	transaction.lists.get_mut().await?.remove(list_index).await?;
627
628	// create
629	if let Some(after_index) = after_index {
630		transaction.lists.get_mut().await?.insert(after_index, list).await?;
631	} else {
632		transaction.lists.get_mut().await?.push(list).await?;
633	}
634	Ok(())
635}
636
637async fn reduce_list_create<S: BlockStorage + Clone + 'static>(
638	transaction: &mut BoardTransaction<S>,
639	list: List,
640	after: Option<String>,
641) -> Result<(), anyhow::Error> {
642	// verify name not exists yet
643	if transaction.find_list_by_name(&list.name).await?.is_some() {
644		return Err(anyhow!("List already exists: {}", list.name));
645	}
646
647	// find after
648	let after_index = if let Some(after) = &after {
649		transaction.find_list_by_name(after).await?.map(|(index, _)| index)
650	} else {
651		None
652	};
653
654	// create
655	if let Some(after_index) = after_index {
656		transaction.lists.get_mut().await?.insert(after_index, list).await?;
657	} else {
658		transaction.lists.get_mut().await?.push(list).await?;
659	}
660	Ok(())
661}
662
663async fn reduce_board_tags_remove(state: &mut Board, tags: Tags) -> Result<(), anyhow::Error> {
664	state.tags.clear(Some(&tags));
665	Ok(())
666}
667
668async fn reduce_board_tags_insert(state: &mut Board, mut tags: Tags) -> Result<(), anyhow::Error> {
669	state.tags.append(&mut tags);
670	Ok(())
671}
672
673async fn reduce_board_rename(state: &mut Board, name: String) -> Result<(), anyhow::Error> {
674	state.name = name;
675	Ok(())
676}
677
678async fn task_lock<S: BlockStorage + Clone + 'static>(
679	transaction: &mut BoardTransaction<S>,
680	task_id: &TaskId,
681	lock: &TaskLock,
682) -> Result<(), anyhow::Error> {
683	match lock {
684		TaskLock::None => {
685			let task = transaction.task(task_id).await?;
686			if task.lock.is_some() {
687				Err(anyhow!("Task locked"))
688			} else {
689				Ok(())
690			}
691		},
692		TaskLock::Force => Ok(()),
693		TaskLock::Lock(lock) => {
694			let mut task = transaction.task(task_id).await?;
695			match task.lock {
696				Some(task_lock) if lock == &task_lock => Ok(()),
697				Some(_task_lock) => Err(anyhow!("Task locked")),
698				None => {
699					task.lock = Some(lock.clone());
700					transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
701					Ok(())
702				},
703			}
704		},
705		TaskLock::Unlock(lock) => {
706			let mut task = transaction.task(task_id).await?;
707			match task.lock {
708				Some(task_lock) if lock == &task_lock => {
709					task.lock = None;
710					transaction.tasks.get_mut().await?.insert(task_id.clone(), task).await?;
711					Ok(())
712				},
713				Some(_task_lock) => Err(anyhow!("Task locked")),
714				None => Ok(()),
715			}
716		},
717	}
718}