p2panda_store/topics/
sqlite.rs1use std::collections::BTreeMap;
4
5use p2panda_core::cbor::{decode_cbor, encode_cbor};
6use p2panda_core::{LogId, VerifyingKey};
7use serde::{Deserialize, Serialize};
8use sqlx::{query, query_as};
9
10use crate::sqlite::{DecodeError, SqliteError, SqliteStore};
11use crate::topics::TopicStore;
12
13impl<T, L> TopicStore<T, VerifyingKey, L> for SqliteStore
16where
17 T: Serialize + for<'de> Deserialize<'de>,
18 L: LogId,
19{
20 type Error = SqliteError;
21
22 async fn associate(
24 &self,
25 topic: &T,
26 author: &VerifyingKey,
27 data_id: &L,
28 ) -> Result<bool, SqliteError> {
29 let result = self
30 .tx(async |tx| {
31 query(
32 "
33 INSERT OR IGNORE
34 INTO
35 topics_v1 (
36 topic,
37 author,
38 data_id
39 )
40 VALUES
41 (?, ?, ?)
42 ",
43 )
44 .bind(
45 encode_cbor(&topic)
46 .map_err(|err| SqliteError::Encode("topic".to_string(), err))?,
47 )
48 .bind(author.to_string())
49 .bind(
50 encode_cbor(&data_id)
51 .map_err(|err| SqliteError::Encode("data_id".to_string(), err))?,
52 )
53 .execute(&mut **tx)
54 .await
55 .map_err(SqliteError::Sqlite)
56 })
57 .await?;
58 Ok(result.rows_affected() > 0)
59 }
60
61 async fn remove(
63 &self,
64 topic: &T,
65 author: &VerifyingKey,
66 data_id: &L,
67 ) -> Result<bool, SqliteError> {
68 let result = self
69 .tx(async |tx| {
70 query(
71 "
72 DELETE FROM
73 topics_v1
74 WHERE
75 topic = ?
76 AND author = ?
77 AND data_id = ?
78 ",
79 )
80 .bind(
81 encode_cbor(&topic)
82 .map_err(|err| SqliteError::Encode("topic".to_string(), err))?,
83 )
84 .bind(author.to_string())
85 .bind(
86 encode_cbor(&data_id)
87 .map_err(|err| SqliteError::Encode("data_id".to_string(), err))?,
88 )
89 .execute(&mut **tx)
90 .await
91 .map_err(SqliteError::Sqlite)
92 })
93 .await?;
94 Ok(result.rows_affected() > 0)
95 }
96
97 async fn resolve(&self, topic: &T) -> Result<BTreeMap<VerifyingKey, Vec<L>>, Self::Error> {
99 let data_ids = self
100 .execute(async |pool| {
101 query_as::<_, (String, Vec<u8>)>(
102 "
103 SELECT
104 author,
105 data_id
106 FROM
107 topics_v1
108 WHERE
109 topic = ?
110 ",
111 )
112 .bind(
113 encode_cbor(&topic)
114 .map_err(|err| SqliteError::Encode("topic".to_string(), err))?,
115 )
116 .fetch_all(pool)
117 .await
118 .map_err(SqliteError::Sqlite)
119 })
120 .await?;
121
122 let mut result: BTreeMap<VerifyingKey, Vec<L>> = BTreeMap::new();
123
124 for (author, data_id) in data_ids {
125 let author: VerifyingKey = author
126 .parse()
127 .map_err(|_| SqliteError::Decode("author".into(), DecodeError::FromStr))?;
128
129 let data_id = decode_cbor(&data_id[..])
130 .map_err(|err| SqliteError::Decode("data_id".into(), err.into()))?;
131
132 result.entry(author).or_default().push(data_id);
134 }
135
136 Ok(result)
137 }
138}