Skip to main content

statefun_sdk/
lib.rs

1//! An SDK for writing "stateful functions" in Rust. For use with
2//! [Apache Flink Stateful Functions](https://flink.apache.org/stateful-functions.html) (Statefun).
3//!
4//! # Examples
5//!
6//! The following shows how to write a simple stateful function and serve it for use in a Statefun
7//! deployment.
8//!
9//! ```no_run
10//! use protobuf::well_known_types::StringValue;
11//!
12//! use statefun_sdk::io::kafka;
13//! use statefun_sdk::transport::hyper::HyperHttpTransport;
14//! use statefun_sdk::transport::Transport;
15//! use statefun_sdk::{Address, Context, Effects, EgressIdentifier, FunctionRegistry, FunctionType};
16//!
17//! let mut function_registry = FunctionRegistry::new();
18//!
19//! function_registry.register_fn(
20//!     FunctionType::new("example", "function1"),
21//!     |context, message: StringValue| {
22//!         let mut effects = Effects::new();
23//!
24//!         effects.send(
25//!             Address::new(FunctionType::new("example", "function2"), "doctor"),
26//!             message,
27//!         );
28//!
29//!         effects
30//!     },
31//! );
32//!
33//! let hyper_transport = HyperHttpTransport::new("0.0.0.0:5000".parse()?);
34//! hyper_transport.run(function_registry)?;
35//!
36//! # Ok::<(), failure::Error>(())
37//! ```
38//!
39//! The program creates a [FunctionRegistry](crate::FunctionRegistry), which can be used to
40//! register one or more functions. Then we register a closure as a stateful function. Finally,
41//! we need to create a [Transport](crate::transport::Transport), in this case the
42//! [HyperHttpTransport](crate::transport::hyper::HyperHttpTransport) to serve our stateful
43//! function.
44//!
45//! Not that you can also use a function instead of a closure when registering functions.
46//!
47//! Refer to the Stateful Functions
48//! [documentation](https://ci.apache.org/projects/flink/flink-statefun-docs-master/) to learn how
49//! to use this in a deployment. Especially the
50//! [modules documentation](https://ci.apache.org/projects/flink/flink-statefun-docs-master/sdk/modules.html#remote-module) is pertinent.
51
52#![deny(missing_docs)]
53
54use std::collections::HashMap;
55use std::fmt::{Display, Formatter};
56use std::time::Duration;
57
58use protobuf::well_known_types::Any;
59use protobuf::Message;
60
61pub use function_registry::FunctionRegistry;
62use statefun_proto::http_function::Address as ProtoAddress;
63
64mod function_registry;
65mod invocation_bridge;
66pub mod io;
67pub mod transport;
68
69/// Context for a single invocation of a stateful function.
70///
71/// The context may be used to obtain the [Address](Address) of the function of the current
72/// invocation or the calling function (if the function was invoked by another function), or to
73/// access state.
74#[derive(Debug)]
75pub struct Context<'a> {
76    state: &'a HashMap<&'a str, &'a [u8]>,
77    self_address: &'a ProtoAddress,
78    caller_address: &'a ProtoAddress,
79}
80
81impl<'a> Context<'a> {
82    fn new(
83        state: &'a HashMap<&str, &[u8]>,
84        self_address: &'a ProtoAddress,
85        caller_address: &'a ProtoAddress,
86    ) -> Self {
87        Context {
88            state,
89            self_address,
90            caller_address,
91        }
92    }
93
94    /// Returns the [Address](Address) of the stateful function that is being called. This is the
95    /// statefun equivalent of `self`.
96    pub fn self_address(&self) -> Address {
97        Address::from_proto(self.self_address)
98    }
99
100    /// Returns the [Address](Address) of the stateful function that caused this function
101    /// invocation, that is, the caller.
102    pub fn caller_address(&self) -> Address {
103        Address::from_proto(self.caller_address)
104    }
105
106    /// Returns the state (or persisted) value that previous invocations of this stateful function
107    /// might have persisted under the given name.
108    pub fn get_state<T: Message>(&self, name: &str) -> Option<T> {
109        let state = self.state.get(name);
110        state.and_then(|serialized_state| {
111            let packed_state: Any =
112                protobuf::parse_from_bytes(serialized_state).expect("Could not deserialize state.");
113
114            log::debug!("Packed state for {}: {:?}", name, packed_state);
115
116            let unpacked_state: Option<T> = packed_state
117                .unpack()
118                .expect("Could not unpack state from Any.");
119
120            unpacked_state
121        })
122    }
123}
124
125/// The unique identity of an individual stateful function.
126///
127/// This comprises the function's `FunctionType` and an unique identifier within the
128/// type. The function's type denotes the class of function to invoke, while the unique identifier
129/// addresses the invocation to a specific function instance.
130///
131/// This must be used when sending messages to stateful functions as part of the function
132/// [Effects](Effects).
133#[derive(Debug)]
134pub struct Address {
135    /// `FunctionType` of the stateful function that this `Address` refers to.
136    pub function_type: FunctionType,
137
138    /// Unique id of the stateful function that this `Address` refers to.
139    pub id: String,
140}
141
142impl Display for Address {
143    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
144        write!(f, "Address {}/{}", self.function_type, self.id)
145    }
146}
147
148impl Address {
149    /// Creates a new `Address` from the given `FunctionType` and id.
150    pub fn new(function_type: FunctionType, id: &str) -> Self {
151        Address {
152            function_type,
153            id: id.to_owned(),
154        }
155    }
156
157    /// Converts the Protobuf `Address` into an `Address`. We don't implement `From`/`Into` for this
158    /// because we want to keep it out of the public API.
159    fn from_proto(proto_address: &ProtoAddress) -> Self {
160        Address {
161            function_type: FunctionType::new(
162                proto_address.get_namespace(),
163                proto_address.get_field_type(),
164            ),
165            id: proto_address.get_id().to_owned(),
166        }
167    }
168
169    /// Converts this `Address` into a Protobuf `Address`. We don't implement `From`/`Into` for this
170    /// because we want to keep it out of the public API.
171    fn into_proto(self) -> ProtoAddress {
172        let mut result = ProtoAddress::new();
173        result.set_namespace(self.function_type.namespace);
174        result.set_field_type(self.function_type.name);
175        result.set_id(self.id);
176        result
177    }
178}
179
180/// A reference to a stateful function, consisting of a namespace and a name.
181///
182/// A function's type is part of a function's [Address](Address) and serves as integral part of an
183/// individual function's identity.
184#[derive(PartialEq, Eq, Hash, Debug)]
185pub struct FunctionType {
186    namespace: String,
187    name: String,
188}
189
190impl FunctionType {
191    /// Creates a new `FunctionType` from the given namespace and name.
192    pub fn new(namespace: &str, name: &str) -> FunctionType {
193        FunctionType {
194            namespace: namespace.to_string(),
195            name: name.to_string(),
196        }
197    }
198}
199
200impl Display for FunctionType {
201    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202        write!(f, "FunctionType {}/{}", self.namespace, self.name)
203    }
204}
205
206/// Effects (or side effects) of a stateful function invocation.
207///
208/// This can be used to:
209///  - send messages ourselves or other stateful functions
210///  - send messages to an egress
211///  - update the state of this stateful function, which will be available on future invocations
212#[derive(Default, Debug)]
213pub struct Effects {
214    invocations: Vec<(Address, Any)>,
215    delayed_invocations: Vec<(Address, Duration, Any)>,
216    egress_messages: Vec<(EgressIdentifier, Any)>,
217    state_updates: Vec<StateUpdate>,
218}
219
220impl Effects {
221    /// Creates a new empty `Effects`.
222    pub fn new() -> Effects {
223        Effects {
224            invocations: Vec::new(),
225            delayed_invocations: Vec::new(),
226            egress_messages: Vec::new(),
227            state_updates: Vec::new(),
228        }
229    }
230
231    /// Sends a message to the stateful function identified by the address.
232    pub fn send<M: Message>(&mut self, address: Address, message: M) {
233        let packed_message = Any::pack(&message).unwrap();
234        self.invocations.push((address, packed_message));
235    }
236
237    /// Sends a message to the stateful function identified by the address after a delay.
238    pub fn send_after<M: Message>(&mut self, address: Address, delay: Duration, message: M) {
239        let packed_message = Any::pack(&message).unwrap();
240        self.delayed_invocations
241            .push((address, delay, packed_message));
242    }
243
244    /// Sends a message to the egress identifier by the `EgressIdentifier`.
245    pub fn egress<M: Message>(&mut self, identifier: EgressIdentifier, message: M) {
246        let packed_message = Any::pack(&message).unwrap();
247        self.egress_messages.push((identifier, packed_message));
248    }
249
250    /// Deletes the state kept under the given name.
251    pub fn delete_state(&mut self, name: &str) {
252        self.state_updates
253            .push(StateUpdate::Delete(name.to_owned()));
254    }
255
256    /// Updates the state stored under the given name to the given value.
257    pub fn update_state<T: Message>(&mut self, name: &str, value: &T) {
258        self.state_updates.push(StateUpdate::Update(
259            name.to_owned(),
260            Any::pack(value).expect("Could not pack state update."),
261        ));
262    }
263}
264
265#[derive(Debug)]
266enum StateUpdate {
267    Update(String, Any),
268    Delete(String),
269}
270
271/// A reference to an _egress_, consisting of a namespace and a name.
272///
273/// This has to be used when sending messages to an egress as part of the function
274/// [Effects](Effects).
275#[derive(Debug)]
276pub struct EgressIdentifier {
277    namespace: String,
278    name: String,
279}
280
281impl EgressIdentifier {
282    /// Creates a new `EgressIdentifier` from the given namespace and name.
283    pub fn new(namespace: &str, name: &str) -> EgressIdentifier {
284        EgressIdentifier {
285            namespace: namespace.to_string(),
286            name: name.to_string(),
287        }
288    }
289}
290
291impl Display for EgressIdentifier {
292    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
293        write!(f, "EgressIdentifier {}/{}", self.namespace, self.name)
294    }
295}