picodata_plugin/interplay/mod.rs
1//! Communication between `picodata` runtime and any third party runtimes such as `tokio`,
2//! `smol` or even your custom runtime.
3//! This module provides two API's:
4//! - low level `cbus` API - with raw channels between custom runtime and `picodata` runtime
5//! - high level `tros` API - with the ability to execute async tasks in third party async runtimes
6//! and wait for execution results.
7//! You should prefer a second one.
8//! The usage of cbus API at one's own risk
9//! as there are some pitfalls unbeknownst even to the authors of the runtime.
10//!
11//! # Problem
12//!
13//! `Picodata` uses its own asynchronous runtime, and all plugin code is executed via this runtime.
14//! You can see more in [documentation](https://docs.binary.picodata.io/tarantool/en/reference/reference_lua/fiber.html).
15//! If you want to use any other runtimes, you need a way to communicate
16//! with them from `picodata` runtime; that's why `interplay` module exists.
17//!
18//! # Tros
19//!
20//! Tros is a high-level bridge between `picodata` and external runtimes.
21//! With `tros`, you don't care about the complexity of interaction -
22//! you don't even use channels directly.
23//! It hides all details and provides a simplified API.
24//!
25//! Here is how to do async http request with `tros`:
26//!
27//! ```no_run
28//! use picodata_plugin::interplay::tros;
29//! use picodata_plugin::system::tarantool::fiber;
30//!
31//! let text = tros::TokioExecutor::new(tros::transport::PicodataTransport::default())
32//! .exec(async { reqwest::get("http://example.com").await?.text().await })
33//! .unwrap()
34//! .unwrap();
35//! assert!(text.contains("Example Domain"))
36//! ```
37//!
38//! # Cbus
39//!
40//! When you communicate with `tokio`, or third party system thread, or any other runtime,
41//! there are two questions that need to be answered:
42//! 1) how to send data from `picodata` plugin into this runtime?
43//! 2) how to receive data from third party runtime into `picodata` plugin?
44//!
45//! The answer to the first question is pretty straightforward, you can use channels
46//! that are provided by third party runtime.
47//! For example, if you send data into `tokio` runtime,
48//! just use [tokio channels](https://tokio.rs/tokio/tutorial/channels).
49//!
50//! But this solution doesn't help in the second question, because `tokio` channels don't know anything
51//! about `picodata` runtime.
52//! It matters because when you try to receive data in picodata plugin code,
53//! `picodata` runtime should know it and suspend current fiber.
54//! Then wake up it when data is ready.
55//! That's why we need special channels which will allow working in completely
56//! asynchronous mode - `cbus` channels.
57//!
58//! Check [`cbus`](https://github.com/picodata/tarantool-module/blob/master/tarantool/src/cbus/mod.rs)
59//! documentation if you want to know internals.
60//!
61//! Note that `cbus` is more low-level tool than `tros`, so use it may be more complex than
62//! you expect.
63//! We recommend to use `cbus` API for stream data processing or similar tasks.
64//! Otherwise, we recommend trying to use `tros` API due to its simplicity.
65//!
66//! Here we will give a description of the `cbus` channels:
67//! - [`cbus::oneshot::channel`] - provide the oneshot channels, using these channels you can send
68//! data just once.
69//! - [`cbus::unbounded::channel`] - provide unbounded channels, using these channels you can
70//! send data any number of times.
71//! - [`cbus::sync::std::channel`] - provide sync std channels, this channels similar to unbounded
72//! but when you call `send` function from std thread it returns only when `picodata` plugin code
73//! receive data.
74//! - [`cbus::sync::tokio::channel`] - provide sync `tokio` channels, this channels is similar to unbounded one
75//! but when you call `send` function from `tokio` task it returns only when `picodata` plugin code
76//! receive data.
77//!
78//! ## Cbus example
79//!
80//! Let's try to solve a little task - we need to parse multiple files.
81//! Imagine we have files with big json encoded objects in it.
82//! We want to parse files in parallel because the number of files is huge, and we would like to
83//! get the result as quickly as possible.
84//! And we would additionally like to free up the `picodata` runtime from the heavy task of parsing,
85//! because open and parse file is blocking operations that may "lock" `picodata`
86//! runtime for a long time.
87//!
88//! Okay, now let's solve it.
89//! We will use a separate threadpool, to parse file, threads will
90//! receive parsing tasks (file names) and send parsed data back
91//! to `picodata` runtime (using `cbus` unbounded channel).
92//!
93//! First, create a `parse` function.
94//! This function will open a file, parse data, and send a result using an unbounded channel.
95//!
96//! ```no_run
97//! use std::{fs, path::Path};
98//! use picodata_plugin::interplay::cbus;
99//!
100//! fn parse(file: &Path, tx: cbus::unbounded::Sender<serde_json::Value>) {
101//! let raw_json = fs::read_to_string(file).unwrap();
102//! let object = serde_json::from_str(&raw_json).unwrap();
103//! tx.send(object).unwrap();
104//! }
105//! ```
106//!
107//! Second, lets create a function for parse a list of files that may be used in `picodata` runtime.
108//!
109//! ```no_run
110//! # use std::{fs, path::Path};
111//! # fn parse(file: &Path, tx: unbounded::Sender<serde_json::Value>) {
112//! # let raw_json = fs::read_to_string(file).unwrap();
113//! # let object = serde_json::from_str(&raw_json).unwrap();
114//! # tx.send(object).unwrap();
115//! # }
116//! use std::{path::PathBuf, sync::OnceLock};
117//! use picodata_plugin::interplay::channel::unbounded;
118//! use threadpool::ThreadPool;
119//!
120//! static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
121//!
122//! fn parse_files(files: &[PathBuf]) -> Vec<serde_json::Value> {
123//! let (tx, rx) = unbounded::channel();
124//! for file in files {
125//! let file = file.clone();
126//! let tx = tx.clone();
127//! THREAD_POOL
128//! .get_or_init(|| ThreadPool::new(16))
129//! .execute(move || parse(&file, tx));
130//! }
131//!
132//! let mut result = Vec::with_capacity(files.len());
133//! while let Ok(object) = rx.receive() {
134//! result.push(object)
135//! };
136//! result
137//! }
138//! ```
139
140pub use tarantool::cbus;
141pub use tros;
142
143pub mod channel {
144 /// ***For internal usage***
145 pub const DEFAULT_CBUS_ENDPOINT: &str = "picodata-channels";
146
147 pub mod oneshot {
148 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
149 pub use tarantool::cbus::oneshot::{EndpointReceiver, Sender};
150
151 /// Creates a new oneshot channel, returning the sender/receiver halves.
152 ///
153 /// See [`tarantool::cbus::oneshot::channel`] for more.
154 pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
155 tarantool::cbus::oneshot::channel(DEFAULT_CBUS_ENDPOINT)
156 }
157 }
158
159 pub mod unbounded {
160 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
161 pub use tarantool::cbus::unbounded::{EndpointReceiver, Sender};
162
163 /// Creates a new unbounded channel, returning the sender/receiver halves.
164 ///
165 /// See [`tarantool::cbus::unbounded::channel`] for more.
166 pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
167 tarantool::cbus::unbounded::channel(DEFAULT_CBUS_ENDPOINT)
168 }
169 }
170
171 pub mod sync {
172 pub mod std {
173 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
174 use std::num::NonZeroUsize;
175 pub use tarantool::cbus::sync::std::{EndpointReceiver, Sender};
176
177 /// Creates a new synchronous channel for communication with std threads,
178 /// returning the sender/receiver halves.
179 ///
180 /// See [`tarantool::cbus::sync::std::channel`] for more.
181 pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
182 tarantool::cbus::sync::std::channel(DEFAULT_CBUS_ENDPOINT, cap)
183 }
184 }
185
186 pub mod tokio {
187 use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
188 use std::num::NonZeroUsize;
189 pub use tarantool::cbus::sync::tokio::{EndpointReceiver, Sender};
190
191 /// Creates a new synchronous channel for communication with tokio runtime,
192 /// returning the sender/receiver halves.
193 ///
194 /// See [`tarantool::cbus::sync::tokio::channel`] for more.
195 pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
196 tarantool::cbus::sync::tokio::channel(DEFAULT_CBUS_ENDPOINT, cap)
197 }
198 }
199 }
200}