statefun_sdk/lib.rs
1//! An SDK for writing "stateful functions" in Rust. For use with
2//! [Apache Flink Stateful Functions](https://flink.apache.org/stateful-functions.html) (Statefun).
3//!
4//! # Examples
5//!
6//! The following shows how to write a simple stateful function and serve it for use in a Statefun
7//! deployment.
8//!
9//! ```no_run
10//! use protobuf::well_known_types::StringValue;
11//!
12//! use statefun_sdk::io::kafka;
13//! use statefun_sdk::transport::hyper::HyperHttpTransport;
14//! use statefun_sdk::transport::Transport;
15//! use statefun_sdk::{Address, Context, Effects, EgressIdentifier, FunctionRegistry, FunctionType};
16//!
17//! let mut function_registry = FunctionRegistry::new();
18//!
19//! function_registry.register_fn(
20//! FunctionType::new("example", "function1"),
21//! |context, message: StringValue| {
22//! let mut effects = Effects::new();
23//!
24//! effects.send(
25//! Address::new(FunctionType::new("example", "function2"), "doctor"),
26//! message,
27//! );
28//!
29//! effects
30//! },
31//! );
32//!
33//! let hyper_transport = HyperHttpTransport::new("0.0.0.0:5000".parse()?);
34//! hyper_transport.run(function_registry)?;
35//!
36//! # Ok::<(), failure::Error>(())
37//! ```
38//!
39//! The program creates a [FunctionRegistry](crate::FunctionRegistry), which can be used to
40//! register one or more functions. Then we register a closure as a stateful function. Finally,
41//! we need to create a [Transport](crate::transport::Transport), in this case the
42//! [HyperHttpTransport](crate::transport::hyper::HyperHttpTransport) to serve our stateful
43//! function.
44//!
45//! Not that you can also use a function instead of a closure when registering functions.
46//!
47//! Refer to the Stateful Functions
48//! [documentation](https://ci.apache.org/projects/flink/flink-statefun-docs-master/) to learn how
49//! to use this in a deployment. Especially the
50//! [modules documentation](https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/modules.html#remote-module) is pertinent.
51
52#![deny(missing_docs)]
53
54use std::collections::HashMap;
55use std::fmt::{Display, Formatter};
56use std::time::Duration;
57
58use protobuf::well_known_types::Any;
59use protobuf::Message;
60
61pub use function_registry::FunctionRegistry;
62use statefun_proto::http_function::Address as ProtoAddress;
63
64mod function_registry;
65mod invocation_bridge;
66pub mod io;
67pub mod transport;
68
69/// Context for a single invocation of a stateful function.
70///
71/// The context may be used to obtain the [Address](Address) of the function of the current
72/// invocation or the calling function (if the function was invoked by another function), or to
73/// access state.
74#[derive(Debug)]
75pub struct Context<'a> {
76 state: &'a HashMap<&'a str, &'a [u8]>,
77 self_address: &'a ProtoAddress,
78 caller_address: &'a ProtoAddress,
79}
80
81impl<'a> Context<'a> {
82 fn new(
83 state: &'a HashMap<&str, &[u8]>,
84 self_address: &'a ProtoAddress,
85 caller_address: &'a ProtoAddress,
86 ) -> Self {
87 Context {
88 state,
89 self_address,
90 caller_address,
91 }
92 }
93
94 /// Returns the [Address](Address) of the stateful function that is being called. This is the
95 /// statefun equivalent of `self`.
96 pub fn self_address(&self) -> Address {
97 Address::from_proto(self.self_address)
98 }
99
100 /// Returns the [Address](Address) of the stateful function that caused this function
101 /// invocation, that is, the caller.
102 pub fn caller_address(&self) -> Address {
103 Address::from_proto(self.caller_address)
104 }
105
106 /// Returns the state (or persisted) value that previous invocations of this stateful function
107 /// might have persisted under the given name.
108 pub fn get_state<T: Message>(&self, name: &str) -> Option<T> {
109 let state = self.state.get(name);
110 state.and_then(|serialized_state| {
111 let packed_state: Any =
112 protobuf::parse_from_bytes(serialized_state).expect("Could not deserialize state.");
113
114 log::debug!("Packed state for {}: {:?}", name, packed_state);
115
116 let unpacked_state: Option<T> = packed_state
117 .unpack()
118 .expect("Could not unpack state from Any.");
119
120 unpacked_state
121 })
122 }
123}
124
125/// The unique identity of an individual stateful function.
126///
127/// This comprises the function's `FunctionType` and an unique identifier within the
128/// type. The function's type denotes the class of function to invoke, while the unique identifier
129/// addresses the invocation to a specific function instance.
130///
131/// This must be used when sending messages to stateful functions as part of the function
132/// [Effects](Effects).
133#[derive(Debug)]
134pub struct Address {
135 /// `FunctionType` of the stateful function that this `Address` refers to.
136 pub function_type: FunctionType,
137
138 /// Unique id of the stateful function that this `Address` refers to.
139 pub id: String,
140}
141
142impl Display for Address {
143 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144 write!(f, "Address {}/{}", self.function_type, self.id)
145 }
146}
147
148impl Address {
149 /// Creates a new `Address` from the given `FunctionType` and id.
150 pub fn new(function_type: FunctionType, id: &str) -> Self {
151 Address {
152 function_type,
153 id: id.to_owned(),
154 }
155 }
156
157 /// Converts the Protobuf `Address` into an `Address`. We don't implement `From`/`Into` for this
158 /// because we want to keep it out of the public API.
159 fn from_proto(proto_address: &ProtoAddress) -> Self {
160 Address {
161 function_type: FunctionType::new(
162 proto_address.get_namespace(),
163 proto_address.get_field_type(),
164 ),
165 id: proto_address.get_id().to_owned(),
166 }
167 }
168
169 /// Converts this `Address` into a Protobuf `Address`. We don't implement `From`/`Into` for this
170 /// because we want to keep it out of the public API.
171 fn into_proto(self) -> ProtoAddress {
172 let mut result = ProtoAddress::new();
173 result.set_namespace(self.function_type.namespace);
174 result.set_field_type(self.function_type.name);
175 result.set_id(self.id);
176 result
177 }
178}
179
180/// A reference to a stateful function, consisting of a namespace and a name.
181///
182/// A function's type is part of a function's [Address](Address) and serves as integral part of an
183/// individual function's identity.
184#[derive(PartialEq, Eq, Hash, Debug)]
185pub struct FunctionType {
186 namespace: String,
187 name: String,
188}
189
190impl FunctionType {
191 /// Creates a new `FunctionType` from the given namespace and name.
192 pub fn new(namespace: &str, name: &str) -> FunctionType {
193 FunctionType {
194 namespace: namespace.to_string(),
195 name: name.to_string(),
196 }
197 }
198}
199
200impl Display for FunctionType {
201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202 write!(f, "FunctionType {}/{}", self.namespace, self.name)
203 }
204}
205
206/// Effects (or side effects) of a stateful function invocation.
207///
208/// This can be used to:
209/// - send messages ourselves or other stateful functions
210/// - send messages to an egress
211/// - update the state of this stateful function, which will be available on future invocations
212#[derive(Default, Debug)]
213pub struct Effects {
214 invocations: Vec<(Address, Any)>,
215 delayed_invocations: Vec<(Address, Duration, Any)>,
216 egress_messages: Vec<(EgressIdentifier, Any)>,
217 state_updates: Vec<StateUpdate>,
218}
219
220impl Effects {
221 /// Creates a new empty `Effects`.
222 pub fn new() -> Effects {
223 Effects {
224 invocations: Vec::new(),
225 delayed_invocations: Vec::new(),
226 egress_messages: Vec::new(),
227 state_updates: Vec::new(),
228 }
229 }
230
231 /// Sends a message to the stateful function identified by the address.
232 pub fn send<M: Message>(&mut self, address: Address, message: M) {
233 let packed_message = Any::pack(&message).unwrap();
234 self.invocations.push((address, packed_message));
235 }
236
237 /// Sends a message to the stateful function identified by the address after a delay.
238 pub fn send_after<M: Message>(&mut self, address: Address, delay: Duration, message: M) {
239 let packed_message = Any::pack(&message).unwrap();
240 self.delayed_invocations
241 .push((address, delay, packed_message));
242 }
243
244 /// Sends a message to the egress identifier by the `EgressIdentifier`.
245 pub fn egress<M: Message>(&mut self, identifier: EgressIdentifier, message: M) {
246 let packed_message = Any::pack(&message).unwrap();
247 self.egress_messages.push((identifier, packed_message));
248 }
249
250 /// Deletes the state kept under the given name.
251 pub fn delete_state(&mut self, name: &str) {
252 self.state_updates
253 .push(StateUpdate::Delete(name.to_owned()));
254 }
255
256 /// Updates the state stored under the given name to the given value.
257 pub fn update_state<T: Message>(&mut self, name: &str, value: &T) {
258 self.state_updates.push(StateUpdate::Update(
259 name.to_owned(),
260 Any::pack(value).expect("Could not pack state update."),
261 ));
262 }
263}
264
265#[derive(Debug)]
266enum StateUpdate {
267 Update(String, Any),
268 Delete(String),
269}
270
271/// A reference to an _egress_, consisting of a namespace and a name.
272///
273/// This has to be used when sending messages to an egress as part of the function
274/// [Effects](Effects).
275#[derive(Debug)]
276pub struct EgressIdentifier {
277 namespace: String,
278 name: String,
279}
280
281impl EgressIdentifier {
282 /// Creates a new `EgressIdentifier` from the given namespace and name.
283 pub fn new(namespace: &str, name: &str) -> EgressIdentifier {
284 EgressIdentifier {
285 namespace: namespace.to_string(),
286 name: name.to_string(),
287 }
288 }
289}
290
291impl Display for EgressIdentifier {
292 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
293 write!(f, "EgressIdentifier {}/{}", self.namespace, self.name)
294 }
295}