kitsune2_api/
agent.rs

1use crate::*;
2use std::sync::Arc;
3
4/// Defines a type capable of cryptographic signatures.
5pub trait Signer {
6    /// Sign the encoded data, returning the resulting detached signature bytes.
7    fn sign<'a, 'b: 'a, 'c: 'a>(
8        &'a self,
9        agent_info: &'b AgentInfo,
10        message: &'c [u8],
11    ) -> BoxFut<'a, K2Result<bytes::Bytes>>;
12}
13
14/// Defines a type capable of cryptographic verification.
15pub trait Verifier: std::fmt::Debug {
16    /// Verify the provided detached signature over the provided message.
17    /// Returns `true` if the signature is valid.
18    fn verify(
19        &self,
20        agent_info: &AgentInfo,
21        message: &[u8],
22        signature: &[u8],
23    ) -> bool;
24}
25
26/// Trait-object [Verifier].
27pub type DynVerifier = Arc<dyn Verifier + 'static + Send + Sync>;
28
29impl Verifier for DynVerifier {
30    fn verify(
31        &self,
32        agent_info: &AgentInfo,
33        message: &[u8],
34        signature: &[u8],
35    ) -> bool {
36        (**self).verify(agent_info, message, signature)
37    }
38}
39
40/// A "Local" agent is an agent that is connected to the local Kitsune2 node,
41/// and is able to sign messages and agent infos.
42pub trait LocalAgent: Signer + 'static + Send + Sync + std::fmt::Debug {
43    /// The [AgentId] of this local agent.
44    fn agent(&self) -> &AgentId;
45
46    /// Register a callback to be invoked when [Self::invoke_cb] is called.
47    /// Implementations need only track a single cb. If this is called again,
48    /// use only the new one.
49    fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>);
50
51    /// Invoke the registered cb if one has been set.
52    /// This can be treated as a no-op rather than an error if [Self::register_cb] has not yet been called.
53    fn invoke_cb(&self);
54
55    /// Access the current storage arc for this local agent.
56    ///
57    /// This will be used by the space module to construct [AgentInfoSigned].
58    fn get_cur_storage_arc(&self) -> DhtArc;
59
60    /// Set the current storage arc for this local agent.
61    /// This will be initially set to zero on space join.
62    /// The gossip module will update this as data is collected.
63    fn set_cur_storage_arc(&self, arc: DhtArc);
64
65    /// This is a chance for the implementor to influence how large
66    /// a storage arc should be for this agent. The gossip module will
67    /// attempt to collect enough data for claiming storage authority
68    /// over this range.
69    fn get_tgt_storage_arc(&self) -> DhtArc;
70
71    /// The sharding module will attempt to determine an ideal target
72    /// arc for this agent. An implementation is free to use or discard
73    /// this information when returning the arc in [Self::get_tgt_storage_arc].
74    /// This will initially be set to zero on join, but the sharding module
75    /// may later update this to FULL or a true target value.
76    fn set_tgt_storage_arc_hint(&self, arc: DhtArc);
77}
78
79/// Trait-object [LocalAgent].
80pub type DynLocalAgent = Arc<dyn LocalAgent>;
81
82impl LocalAgent for DynLocalAgent {
83    fn agent(&self) -> &AgentId {
84        (**self).agent()
85    }
86
87    fn register_cb(&self, cb: Arc<dyn Fn() + 'static + Send + Sync>) {
88        (**self).register_cb(cb);
89    }
90
91    fn invoke_cb(&self) {
92        (**self).invoke_cb();
93    }
94
95    fn get_cur_storage_arc(&self) -> DhtArc {
96        (**self).get_cur_storage_arc()
97    }
98
99    fn set_cur_storage_arc(&self, arc: DhtArc) {
100        (**self).set_cur_storage_arc(arc);
101    }
102
103    fn get_tgt_storage_arc(&self) -> DhtArc {
104        (**self).get_tgt_storage_arc()
105    }
106
107    fn set_tgt_storage_arc_hint(&self, arc: DhtArc) {
108        (**self).set_tgt_storage_arc_hint(arc);
109    }
110}
111
112impl Signer for DynLocalAgent {
113    fn sign<'a, 'b: 'a, 'c: 'a>(
114        &'a self,
115        agent_info: &'b AgentInfo,
116        message: &'c [u8],
117    ) -> BoxFut<'a, K2Result<bytes::Bytes>> {
118        (**self).sign(agent_info, message)
119    }
120}
121
122mod serde_string_timestamp {
123    pub fn serialize<S>(
124        t: &crate::Timestamp,
125        serializer: S,
126    ) -> Result<S::Ok, S::Error>
127    where
128        S: serde::Serializer,
129    {
130        serializer.serialize_str(&t.as_micros().to_string())
131    }
132
133    pub fn deserialize<'de, D>(
134        deserializer: D,
135    ) -> Result<crate::Timestamp, D::Error>
136    where
137        D: serde::Deserializer<'de>,
138    {
139        let s: &'de str = serde::Deserialize::deserialize(deserializer)?;
140        let i: i64 = s.parse().map_err(serde::de::Error::custom)?;
141        Ok(crate::Timestamp::from_micros(i))
142    }
143}
144
145/// AgentInfo stores metadata related to agents.
146#[derive(
147    Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash,
148)]
149#[serde(rename_all = "camelCase")]
150pub struct AgentInfo {
151    /// The agent id.
152    pub agent: AgentId,
153
154    /// The space id.
155    pub space: SpaceId,
156
157    /// When this metadata was created.
158    #[serde(with = "serde_string_timestamp")]
159    pub created_at: Timestamp,
160
161    /// When this metadata will expire.
162    #[serde(with = "serde_string_timestamp")]
163    pub expires_at: Timestamp,
164
165    /// If `true`, this metadata is a tombstone, indicating
166    /// the agent has gone offline, and is no longer reachable.
167    pub is_tombstone: bool,
168
169    /// If set, this indicates the primary url at which this agent may
170    /// be reached. This should largely only be UNSET if this is a tombstone.
171    pub url: Option<Url>,
172
173    /// The arc over which this agent claims authority.
174    #[serde(default = "DhtArc::default")]
175    pub storage_arc: DhtArc,
176}
177
178/// Signed agent information.
179#[derive(PartialEq, Eq, Hash)]
180pub struct AgentInfoSigned {
181    /// The decoded information associated with this agent.
182    agent_info: AgentInfo,
183
184    /// The encoded information that was signed.
185    encoded: String,
186
187    /// The signature.
188    signature: bytes::Bytes,
189}
190
191impl std::fmt::Debug for AgentInfoSigned {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.write_str("AgentInfoSigned(")?;
194        f.write_str(&self.encoded)?;
195        f.write_str(")")
196    }
197}
198
199impl AgentInfoSigned {
200    /// Generate a signed agent info by signing an agent info.
201    pub async fn sign<S: Signer>(
202        signer: &S,
203        agent_info: AgentInfo,
204    ) -> K2Result<std::sync::Arc<Self>> {
205        let encoded = serde_json::to_string(&agent_info)
206            .map_err(|e| K2Error::other_src("encoding agent_info", e))?;
207        let signature = signer
208            .sign(&agent_info, encoded.as_bytes())
209            .await
210            .map_err(|e| K2Error::other_src("signing agent_info", e))?;
211        Ok(std::sync::Arc::new(Self {
212            agent_info,
213            encoded,
214            signature,
215        }))
216    }
217
218    /// Decode a canonical json encoding of a signed agent info.
219    pub fn decode<V: Verifier>(
220        verifier: &V,
221        encoded: &[u8],
222    ) -> K2Result<std::sync::Arc<Self>> {
223        #[derive(serde::Deserialize)]
224        #[serde(rename_all = "camelCase")]
225        struct Ref {
226            agent_info: String,
227            #[serde(with = "crate::serde_bytes_base64")]
228            signature: bytes::Bytes,
229        }
230        let v: Ref = serde_json::from_slice(encoded)
231            .map_err(|e| K2Error::other_src("decoding agent_info", e))?;
232        Self::inner_decode_one(verifier, v.agent_info, v.signature)
233    }
234
235    /// Decode a canonical json encoding of a list of signed agent infos.
236    pub fn decode_list<V: Verifier>(
237        verifier: &V,
238        encoded: &[u8],
239    ) -> K2Result<Vec<K2Result<std::sync::Arc<Self>>>> {
240        #[derive(serde::Deserialize)]
241        #[serde(rename_all = "camelCase")]
242        struct Ref {
243            agent_info: String,
244            #[serde(with = "crate::serde_bytes_base64")]
245            signature: bytes::Bytes,
246        }
247        let v: Vec<Ref> = serde_json::from_slice(encoded)
248            .map_err(|e| K2Error::other_src("decoding agent_info", e))?;
249        Ok(v.into_iter()
250            .map(|v| {
251                Self::inner_decode_one(verifier, v.agent_info, v.signature)
252            })
253            .collect())
254    }
255
256    fn inner_decode_one<V: Verifier>(
257        verifier: &V,
258        agent_info: String,
259        signature: bytes::Bytes,
260    ) -> K2Result<std::sync::Arc<Self>> {
261        let info: AgentInfo = serde_json::from_str(&agent_info)
262            .map_err(|e| K2Error::other_src("decoding inner agent_info", e))?;
263        if !verifier.verify(&info, agent_info.as_bytes(), &signature) {
264            return Err(K2Error::other("InvalidSignature"));
265        }
266        Ok(std::sync::Arc::new(Self {
267            agent_info: info,
268            encoded: agent_info,
269            signature,
270        }))
271    }
272
273    /// Get the canonical json encoding of this signed agent info.
274    pub fn encode(&self) -> K2Result<String> {
275        #[derive(serde::Serialize)]
276        #[serde(rename_all = "camelCase")]
277        struct Ref<'a> {
278            agent_info: &'a String,
279            #[serde(with = "crate::serde_bytes_base64")]
280            signature: &'a bytes::Bytes,
281        }
282        serde_json::to_string(&Ref {
283            agent_info: &self.encoded,
284            signature: &self.signature,
285        })
286        .map_err(|e| K2Error::other_src("encoding agent_info", e))
287    }
288
289    /// Access the inner [AgentInfo] data. Note, you can instead just deref.
290    pub fn get_agent_info(&self) -> &AgentInfo {
291        self
292    }
293
294    /// Access the canonical encoded inner agent info.
295    pub fn get_encoded(&self) -> &str {
296        &self.encoded
297    }
298
299    /// Access the signature over the encoded inner agent info.
300    pub fn get_signature(&self) -> &bytes::Bytes {
301        &self.signature
302    }
303}
304
305impl std::ops::Deref for AgentInfoSigned {
306    type Target = AgentInfo;
307
308    fn deref(&self) -> &Self::Target {
309        &self.agent_info
310    }
311}
312
313#[cfg(test)]
314mod test {
315    use super::*;
316
317    const SIG: &[u8] = b"fake-signature";
318
319    #[derive(Debug)]
320    struct TestCrypto;
321
322    impl Signer for TestCrypto {
323        fn sign<'a, 'b: 'a, 'c: 'a>(
324            &'a self,
325            _agent_info: &'b AgentInfo,
326            _encoded: &'c [u8],
327        ) -> BoxFut<'a, K2Result<bytes::Bytes>> {
328            Box::pin(async move { Ok(bytes::Bytes::from_static(SIG)) })
329        }
330    }
331
332    impl Verifier for TestCrypto {
333        fn verify(
334            &self,
335            _agent_info: &AgentInfo,
336            _message: &[u8],
337            signature: &[u8],
338        ) -> bool {
339            signature == SIG
340        }
341    }
342
343    #[tokio::test(flavor = "multi_thread")]
344    async fn happy_encode_decode() {
345        let agent: AgentId = bytes::Bytes::from_static(b"test-agent").into();
346        let space_id: SpaceId = bytes::Bytes::from_static(b"test-space").into();
347        let now = Timestamp::from_micros(1731690797907204);
348        let later = Timestamp::from_micros(now.as_micros() + 72_000_000_000);
349        let url = Some(Url::from_str("ws://test.com:80/test-url").unwrap());
350        let storage_arc = DhtArc::Arc(42, u32::MAX / 13);
351
352        let enc = AgentInfoSigned::sign(
353            &TestCrypto,
354            AgentInfo {
355                agent: agent.clone(),
356                space: space_id.clone(),
357                created_at: now,
358                expires_at: later,
359                is_tombstone: false,
360                url: url.clone(),
361                storage_arc,
362            },
363        )
364        .await
365        .unwrap()
366        .encode()
367        .unwrap();
368
369        assert_eq!(
370            r#"{"agentInfo":"{\"agent\":\"dGVzdC1hZ2VudA\",\"space\":\"dGVzdC1zcGFjZQ\",\"createdAt\":\"1731690797907204\",\"expiresAt\":\"1731762797907204\",\"isTombstone\":false,\"url\":\"ws://test.com:80/test-url\",\"storageArc\":[42,330382099]}","signature":"ZmFrZS1zaWduYXR1cmU"}"#,
371            enc
372        );
373
374        let dec = AgentInfoSigned::decode(&TestCrypto, enc.as_bytes()).unwrap();
375        assert_eq!(agent, dec.agent);
376        assert_eq!(space_id, dec.space);
377        assert_eq!(now, dec.created_at);
378        assert_eq!(later, dec.expires_at);
379        assert!(!dec.is_tombstone);
380        assert_eq!(url, dec.url);
381        assert_eq!(storage_arc, dec.storage_arc);
382    }
383
384    #[tokio::test(flavor = "multi_thread")]
385    async fn ignores_future_extension_fields() {
386        AgentInfoSigned::decode(&TestCrypto, br#"{"agentInfo":"{\"agent\":\"dGVzdC1hZ2VudA\",\"space\":\"dGVzdC1zcGFjZQ\",\"createdAt\":\"1731690797907204\",\"expiresAt\":\"1731762797907204\",\"isTombstone\":false,\"url\":\"ws://test.com:80/test-url\",\"storageArc\":[42,330382099],\"fakeField\":\"bla\"}","signature":"ZmFrZS1zaWduYXR1cmU","fakeField2":"bla2"}"#).unwrap();
387    }
388
389    #[tokio::test(flavor = "multi_thread")]
390    async fn fills_in_default_fields() {
391        let dec = AgentInfoSigned::decode(&TestCrypto, br#"{"agentInfo":"{\"agent\":\"dGVzdC1hZ2VudA\",\"space\":\"dGVzdC1zcGFjZQ\",\"createdAt\":\"1731690797907204\",\"expiresAt\":\"1731762797907204\",\"isTombstone\":false}","signature":"ZmFrZS1zaWduYXR1cmU"}"#).unwrap();
392        assert!(dec.url.is_none());
393        assert_eq!(DhtArc::Empty, dec.storage_arc);
394    }
395
396    #[tokio::test(flavor = "multi_thread")]
397    async fn dies_with_invalid_signature() {
398        let dec = AgentInfoSigned::decode(&TestCrypto, br#"{"agentInfo":"{\"agent\":\"dGVzdC1hZ2VudA\",\"space\":\"dGVzdC1zcGFjZQ\",\"createdAt\":\"1731690797907204\",\"expiresAt\":\"1731762797907204\",\"isTombstone\":false}","signature":""}"#).unwrap_err();
399        assert!(dec.to_string().contains("InvalidSignature"));
400    }
401}