1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
use std::fmt::Debug;
use std::hash::Hash;

use crate::context_log::CtxError;
use crate::*;
use aitia::Dep;
use aitia::DepError;
use aitia::DepResult;
use kitsune_p2p::dependencies::kitsune_p2p_fetch::TransferMethod;

/// A DhtOpLite along with its corresponding DhtOpHash
#[derive(
    Clone,
    Debug,
    Serialize,
    Deserialize,
    PartialEq,
    Eq,
    Hash,
    derive_more::Constructor,
    derive_more::Deref,
    derive_more::Into,
)]
pub struct OpInfo {
    #[deref]
    pub(crate) op: DhtOpLite,
    pub(crate) hash: DhtOpHash,
    pub(crate) dep: SysValDeps,
}

impl OpInfo {
    /// Accessor
    pub fn as_hash(&self) -> &DhtOpHash {
        &self.hash
    }
}

pub type OpRef = DhtOpHash;

#[derive(
    Debug,
    Clone,
    PartialEq,
    Eq,
    Hash,
    derive_more::From,
    derive_more::Into,
    serde::Serialize,
    serde::Deserialize,
)]
pub struct ChainOpAction(pub ActionHash, pub ChainOpType);

impl From<ChainOpLite> for ChainOpAction {
    fn from(op: ChainOpLite) -> Self {
        Self(op.action_hash().clone(), op.get_type())
    }
}

impl ChainOpAction {
    pub fn action_hash(&self) -> &ActionHash {
        &self.0
    }

    pub fn op_type(&self) -> &ChainOpType {
        &self.1
    }
}

pub type SleuthId = String;

#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum Event {
    /// The node has integrated an op authored by someone else
    Integrated {
        by: SleuthId,
        op: OpRef,
    },
    /// The node has app validated an op authored by someone else
    AppValidated {
        by: SleuthId,
        op: OpRef,
    },
    /// The node has sys validated an op authored by someone else
    SysValidated {
        by: SleuthId,
        op: OpRef,
    },

    /// TODO: handle a missing app validation dep
    MissingAppValDep {
        by: SleuthId,
        op: OpRef,
        deps: Vec<AnyDhtHash>,
    },
    /// The node has fetched an op after hearing about the hash via publish or gossip
    Fetched {
        by: SleuthId,
        op: OpRef,
    },
    /// The node has published or gossiped this at least once, to somebody
    SentHash {
        by: SleuthId,
        op: OpRef,
        method: TransferMethod,
    },
    /// The node has received an op hash via publish or gossip
    ReceivedHash {
        by: SleuthId,
        op: OpRef,
        method: TransferMethod,
    },
    /// The node has authored this op, including validation and integration
    Authored {
        by: AgentPubKey,
        op: OpInfo,
    },
    /// An agent has joined the network
    AgentJoined {
        node: SleuthId,
        agent: AgentPubKey,
    },
    // XXX: this is a replacement for a proper AgentLeave. This just lets us act as if every
    // agent in the SweetConductor has left
    SweetConductorShutdown {
        node: SleuthId,
    },
}

impl aitia::logging::FactLogJson for Event {}

impl aitia::Fact for Event {
    type Context = Context;

    fn explain(&self, ctx: &Self::Context) -> String {
        match self {
            Event::Integrated { by, op } => {
                format!("[{by}] Integrated: {op}")
            }
            Event::AppValidated { by, op } => {
                format!("[{by}] AppValidated: {op}")
            }
            Event::SysValidated { by, op } => {
                format!("[{by}] SysValidated: {op}")
            }
            Event::MissingAppValDep { by, op, deps } => {
                format!("[{by}] PendingAppValidation: {op} deps: {deps:#?}")
            }
            Event::Fetched { by, op } => format!("[{by}] Fetched: {op}"),
            Event::SentHash { by, op, method } => format!("[{by}] SentHash({method}): {op:?}"),
            Event::ReceivedHash { by, op, method } => {
                format!("[{by}] ReceivedHash({method}): {op:?}")
            }
            Event::Authored { by, op } => {
                let node = ctx.agent_node(by).expect("I got lazy");
                let op_hash = op.as_hash();
                format!("[{node}] Authored: {op_hash}")
            }
            Event::AgentJoined { node, agent } => {
                format!("[{node}] AgentJoined: {agent}")
            }
            Event::SweetConductorShutdown { node } => {
                format!("[{node}] SweetConductorShutdown")
            }
        }
    }

    fn dep(&self, ctx: &Self::Context) -> DepResult<Self> {
        use Event::*;

        let mapper = |e: CtxError| DepError {
            info: e,
            fact: Some(self.clone()),
        };

        Ok(match self.clone() {
            // Op hashes only get gossiped and published by a node after being fully integrated by that node
            // TODO: could add more antecedents
            SentHash { by, op, method: _ } => Some(Self::authority(ctx, by, op)?),

            // Ops get integrated directly after being app validated
            Integrated { by, op } => Some(
                AppValidated {
                    by: by.clone(),
                    op: op.clone(),
                }
                .into(),
            ),

            // Ops get app validated directly after being sys validated
            AppValidated { by, op } => Some(SysValidated { by, op }.into()),

            // TODO
            MissingAppValDep {
                by: _,
                op: _,
                deps: _,
            } => todo!(),

            // Ops can only be sys validated after being fetched from an authority, and after
            // its dependency has been integrated
            SysValidated { by, op } => {
                let dep = ctx.sysval_op_deps(&op).map_err(mapper)?;

                let fetched = Fetched {
                    by: by.clone(),
                    op: op.clone(),
                }
                .into();

                // TODO: this must be generalized to support multiple dependencies.
                //       we're only using the first here.
                if let Some(dep) = dep.first() {
                    let integrated = Dep::from(Integrated {
                        by,
                        op: dep.hash.clone(),
                    });
                    // TODO: eventually we don't want to just use anything we fetched, right?
                    // TODO: currently we don't actually need to integrate the dep, it can just exist in the cache
                    Some(Dep::every_named("Exists", vec![fetched, integrated]))
                } else {
                    Some(fetched)
                }
            }

            // An op can be fetched only if its hash is in the fetch pool, which happens
            // whenever the op is received by any method
            Fetched { by, op } => Some(Dep::any_named(
                "ReceivedHash",
                [TransferMethod::Publish, TransferMethod::Gossip]
                    .into_iter()
                    .map(|method| {
                        ReceivedHash {
                            by: by.clone(),
                            op: op.clone(),
                            method,
                        }
                        .into()
                    })
                    .collect(),
            )),

            // We can only receive a hash via a given method if some other node has sent it
            // via that method
            ReceivedHash { by, op, method } => {
                let others: Vec<_> = ctx
                    .map_node_to_agents
                    .keys()
                    .filter(|i| **i != by)
                    .cloned()
                    .map(|i| {
                        SentHash {
                            by: i,
                            op: op.clone(),
                            method,
                        }
                        .into()
                    })
                    .collect();
                Some(Dep::any_named("Received hash from authority", others))
            }

            // An agent can author an op at any time, but must have joined the network first
            Authored { by, op: _ } => {
                let node = ctx.agent_node(&by).map_err(mapper)?.clone();
                Some(Dep::from(AgentJoined { node, agent: by }))
            }

            // An agent can join at any time
            AgentJoined { .. } => None,

            // "Special" cause
            SweetConductorShutdown { .. } => None,
        })
    }

    fn check(&self, ctx: &Self::Context) -> bool {
        ctx.check(self)
    }
}

impl Event {
    /// The cause which is satisfied by either Integrating this op,
    /// or having authored this op by any of the local agents
    #[allow(clippy::result_large_err)]
    pub fn authority(ctx: &Context, by: SleuthId, op: OpRef) -> Result<Dep<Self>, DepError<Self>> {
        let integrated = Self::Integrated {
            by: by.clone(),
            op: op.clone(),
        }
        .into();
        let mut any = vec![integrated];

        let mapper = |e: CtxError| DepError {
            info: e,
            fact: None,
        };

        let op_info = ctx.op_info(&op).map_err(mapper)?;
        let authors = ctx
            .node_agents(&by)
            .map_err(mapper)?
            .iter()
            .cloned()
            .map(|agent| Self::Authored {
                by: agent,
                op: op_info.clone(),
            })
            .map(Dep::from);

        any.extend(authors);
        Ok(Dep::any_named("Authority", any))
    }
}