picodata_plugin/interplay/mod.rs
1//! Communication between `picodata` runtime and any third party runtimes such as `tokio`,
2//! `smol` or even your custom runtime.
3//! This module provides two API's:
4//! - low level `cbus` API - with raw channels between custom runtime and `picodata` runtime
5//! - high level `tros` API - with the ability to execute async tasks in third party async runtimes
6//! and wait for execution results.
7//!
8//! You should prefer a second one.
9//! The usage of cbus API at one's own risk
10//! as there are some pitfalls unbeknownst even to the authors of the runtime.
11//!
12//! # Problem
13//!
14//! `Picodata` uses its own asynchronous runtime, and all plugin code is executed via this runtime.
15//! You can see more in [documentation](https://docs.binary.picodata.io/tarantool/en/reference/reference_lua/fiber.html).
16//! If you want to use any other runtimes, you need a way to communicate
17//! with them from `picodata` runtime; that's why `interplay` module exists.
18//!
19//! # Tros
20//!
21//! Tros is a high-level bridge between `picodata` and external runtimes.
22//! With `tros`, you don't care about the complexity of interaction -
23//! you don't even use channels directly.
24//! It hides all details and provides a simplified API.
25//!
26//! Here is how to do async http request with `tros`:
27//!
28//! ```no_run
29//! use picodata_plugin::interplay::tros;
30//! use picodata_plugin::system::tarantool::fiber;
31//!
32//! let text = tros::TokioExecutor::new(tros::transport::PicodataTransport::default())
33//! .exec(async { reqwest::get("http://example.com").await?.text().await })
34//! .unwrap()
35//! .unwrap();
36//! assert!(text.contains("Example Domain"))
37//! ```
38//!
39//! # Cbus
40//!
41//! When you communicate with `tokio`, or third party system thread, or any other runtime,
42//! there are two questions that need to be answered:
43//! 1) how to send data from `picodata` plugin into this runtime?
44//! 2) how to receive data from third party runtime into `picodata` plugin?
45//!
46//! The answer to the first question is pretty straightforward, you can use channels
47//! that are provided by third party runtime.
48//! For example, if you send data into `tokio` runtime,
49//! just use [tokio channels](https://tokio.rs/tokio/tutorial/channels).
50//!
51//! But this solution doesn't help in the second question, because `tokio` channels don't know anything
52//! about `picodata` runtime.
53//! It matters because when you try to receive data in picodata plugin code,
54//! `picodata` runtime should know it and suspend current fiber.
55//! Then wake up it when data is ready.
56//! That's why we need special channels which will allow working in completely
57//! asynchronous mode - `cbus` channels.
58//!
59//! Check [`cbus`](https://github.com/picodata/tarantool-module/blob/master/tarantool/src/cbus/mod.rs)
60//! documentation if you want to know internals.
61//!
62//! Note that `cbus` is more low-level tool than `tros`, so use it may be more complex than
63//! you expect.
64//! We recommend to use `cbus` API for stream data processing or similar tasks.
65//! Otherwise, we recommend trying to use `tros` API due to its simplicity.
66//!
67//! Here we will give a description of the `cbus` channels:
68//! - [`cbus::oneshot::channel`] - provide the oneshot channels, using these channels you can send
69//! data just once.
70//! - [`cbus::unbounded::channel`] - provide unbounded channels, using these channels you can
71//! send data any number of times.
72//! - [`cbus::sync::std::channel`] - provide sync std channels, this channels similar to unbounded
73//! but when you call `send` function from std thread it returns only when `picodata` plugin code
74//! receive data.
75//! - [`cbus::sync::tokio::channel`] - provide sync `tokio` channels, this channels is similar to unbounded one
76//! but when you call `send` function from `tokio` task it returns only when `picodata` plugin code
77//! receive data.
78//!
79//! ## Cbus example
80//!
81//! Let's try to solve a little task - we need to parse multiple files.
82//! Imagine we have files with big json encoded objects in it.
83//! We want to parse files in parallel because the number of files is huge, and we would like to
84//! get the result as quickly as possible.
85//! And we would additionally like to free up the `picodata` runtime from the heavy task of parsing,
86//! because open and parse file is blocking operations that may "lock" `picodata`
87//! runtime for a long time.
88//!
89//! Okay, now let's solve it.
90//! We will use a separate threadpool, to parse file, threads will
91//! receive parsing tasks (file names) and send parsed data back
92//! to `picodata` runtime (using `cbus` unbounded channel).
93//!
94//! First, create a `parse` function.
95//! This function will open a file, parse data, and send a result using an unbounded channel.
96//!
97//! ```no_run
98//! use std::{fs, path::Path};
99//! use picodata_plugin::interplay::cbus;
100//!
101//! fn parse(file: &Path, tx: cbus::unbounded::Sender<serde_json::Value>) {
102//! let raw_json = fs::read_to_string(file).unwrap();
103//! let object = serde_json::from_str(&raw_json).unwrap();
104//! tx.send(object).unwrap();
105//! }
106//! ```
107//!
108//! Second, lets create a function for parse a list of files that may be used in `picodata` runtime.
109//!
110//! ```no_run
111//! # use std::{fs, path::Path};
112//! # fn parse(file: &Path, tx: unbounded::Sender<serde_json::Value>) {
113//! # let raw_json = fs::read_to_string(file).unwrap();
114//! # let object = serde_json::from_str(&raw_json).unwrap();
115//! # tx.send(object).unwrap();
116//! # }
117//! use std::{path::PathBuf, sync::OnceLock};
118//! use picodata_plugin::interplay::channel::unbounded;
119//! use threadpool::ThreadPool;
120//!
121//! static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
122//!
123//! fn parse_files(files: &[PathBuf]) -> Vec<serde_json::Value> {
124//! let (tx, rx) = unbounded::channel();
125//! for file in files {
126//! let file = file.clone();
127//! let tx = tx.clone();
128//! THREAD_POOL
129//! .get_or_init(|| ThreadPool::new(16))
130//! .execute(move || parse(&file, tx));
131//! }
132//!
133//! let mut result = Vec::with_capacity(files.len());
134//! while let Ok(object) = rx.receive() {
135//! result.push(object)
136//! };
137//! result
138//! }
139//! ```
140
141pub use tarantool::cbus;
142pub use tros;
143
144pub mod channel {
145 /// ***For internal usage***
146 pub const DEFAULT_CBUS_ENDPOINT: &str = "picodata-channels";
147
148 pub mod oneshot {
149 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
150 pub use tarantool::cbus::oneshot::{EndpointReceiver, Sender};
151
152 /// Creates a new oneshot channel, returning the sender/receiver halves.
153 ///
154 /// See [`tarantool::cbus::oneshot::channel`] for more.
155 pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
156 tarantool::cbus::oneshot::channel(DEFAULT_CBUS_ENDPOINT)
157 }
158 }
159
160 pub mod unbounded {
161 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
162 pub use tarantool::cbus::unbounded::{EndpointReceiver, Sender};
163
164 /// Creates a new unbounded channel, returning the sender/receiver halves.
165 ///
166 /// See [`tarantool::cbus::unbounded::channel`] for more.
167 pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
168 tarantool::cbus::unbounded::channel(DEFAULT_CBUS_ENDPOINT)
169 }
170 }
171
172 pub mod sync {
173 pub mod std {
174 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
175 use std::num::NonZeroUsize;
176 pub use tarantool::cbus::sync::std::{EndpointReceiver, Sender};
177
178 /// Creates a new synchronous channel for communication with std threads,
179 /// returning the sender/receiver halves.
180 ///
181 /// See [`tarantool::cbus::sync::std::channel`] for more.
182 pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
183 tarantool::cbus::sync::std::channel(DEFAULT_CBUS_ENDPOINT, cap)
184 }
185 }
186
187 pub mod tokio {
188 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
189 use std::num::NonZeroUsize;
190 pub use tarantool::cbus::sync::tokio::{EndpointReceiver, Sender};
191
192 /// Creates a new synchronous channel for communication with tokio runtime,
193 /// returning the sender/receiver halves.
194 ///
195 /// See [`tarantool::cbus::sync::tokio::channel`] for more.
196 pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
197 tarantool::cbus::sync::tokio::channel(DEFAULT_CBUS_ENDPOINT, cap)
198 }
199 }
200 }
201}