dora_node_api/event_stream/event.rs
1use dora_arrow_convert::ArrowData;
2use dora_core::config::{DataId, NodeId, OperatorId};
3use dora_message::metadata::Metadata;
4
5/// Represents an incoming Dora event.
6///
7/// Events might be triggered by other nodes, by Dora itself, or by some external user input.
8///
9/// It's safe to ignore event types that are not relevant to the node.
10///
11/// This enum is marked as `non_exhaustive` because we might add additional
12/// variants in the future. Please ignore unknown event types instead of throwing an
13/// error to avoid breakage when updating Dora.
14#[derive(Debug)]
15#[non_exhaustive]
16#[allow(clippy::large_enum_variant)]
17pub enum Event {
18 /// An input was received from another node.
19 ///
20 /// This event corresponds to one of the `inputs` of the node as specified
21 /// in the dataflow YAML file.
22 Input {
23 /// The input ID, as specified in the YAML file.
24 ///
25 /// Note that this is not the output ID of the sender, but the ID
26 /// assigned to the input in the YAML file.
27 id: DataId,
28 /// Meta information about this input, e.g. the timestamp.
29 metadata: Metadata,
30 /// The actual data in the Apache Arrow data format.
31 data: ArrowData,
32 },
33 /// An input was closed by the sender.
34 ///
35 /// The sending node mapped to an input exited, so this input will receive
36 /// no more data.
37 InputClosed {
38 /// The ID of the input that was closed, as specified in the YAML file.
39 ///
40 /// Note that this is not the output ID of the sender, but the ID
41 /// assigned to the input in the YAML file.
42 id: DataId,
43 },
44 /// A node failed and exited with a non-zero exit code.
45 ///
46 /// The daemon automatically creates this event when a node exits with a non-zero exit code.
47 /// This allows downstream nodes to handle the error gracefully (e.g., use cached data,
48 /// switch to backup source, log and continue).
49 NodeFailed {
50 /// The IDs of the inputs that are affected by the node failure, as specified in the YAML file.
51 ///
52 /// A node failure can affect multiple inputs if the failed node produced multiple outputs
53 /// that are consumed by this node.
54 affected_input_ids: Vec<DataId>,
55 /// The error message describing the failure.
56 error: String,
57 /// The ID of the node that failed.
58 source_node_id: NodeId,
59 },
60 /// Notification that the event stream is about to close.
61 ///
62 /// The [`StopCause`] field contains the reason for the event stream closure.
63 ///
64 /// Nodes should exit once the event stream closes.
65 Stop(StopCause),
66 /// Instructs the node to reload itself or one of its operators.
67 ///
68 /// This event is currently only used for reloading Python operators that are
69 /// started by a `dora runtime` process. So this event should not be sent to normal
70 /// nodes yet.
71 Reload {
72 /// The ID of the operator that should be reloaded.
73 ///
74 /// There is currently no case where `operator_id` is `None`.
75 operator_id: Option<OperatorId>,
76 },
77 /// Notifies the node about an unexpected error that happened inside Dora.
78 ///
79 /// It's a good idea to output or log this error for debugging.
80 Error(String),
81}
82
83/// The reason for closing the event stream.
84///
85/// This enum is marked as `non_exhaustive` because we might add additional
86/// variants in the future.
87#[derive(Debug, Clone)]
88#[non_exhaustive]
89pub enum StopCause {
90 /// The dataflow is stopped early after a `dora stop` command (or on `ctrl-c`).
91 ///
92 /// Nodes should exit as soon as possible if they receive a stop event of
93 /// this type. Dora will kill nodes that keep running for too long after
94 /// receiving such a stop event.
95 Manual,
96 /// The event stream is closed because all of the node's inputs were closed.
97 ///
98 /// This stop event type is only sent for nodes that have at least one input.
99 AllInputsClosed,
100}