picodata_plugin/interplay/
mod.rs

1//! Communication between `picodata` runtime and any third party runtimes such as `tokio`,
2//! `smol` or even your custom runtime.
3//! This module provides two API's:
4//! - low level `cbus` API - with raw channels between custom runtime and `picodata` runtime
5//! - high level `tros` API - with the ability to execute async tasks in third party async runtimes
6//! and wait for execution results.
7//! You should prefer a second one.
8//! The usage of cbus API at one's own risk
9//! as there are some pitfalls unbeknownst even to the authors of the runtime.
10//!
11//! # Problem
12//!
13//! `Picodata` uses its own asynchronous runtime, and all plugin code is executed via this runtime.
14//! You can see more in [documentation](https://docs.binary.picodata.io/tarantool/en/reference/reference_lua/fiber.html).
15//! If you want to use any other runtimes, you need a way to communicate
16//! with them from `picodata` runtime; that's why `interplay` module exists.
17//!
18//! # Tros
19//!
20//! Tros is a high-level bridge between `picodata` and external runtimes.
21//! With `tros`, you don't care about the complexity of interaction -
22//! you don't even use channels directly.
23//! It hides all details and provides a simplified API.
24//!
25//! Here is how to do async http request with `tros`:
26//!
27//! ```no_run
28//! use picodata_plugin::interplay::tros;
29//! use picodata_plugin::system::tarantool::fiber;
30//!
31//! let text = tros::TokioExecutor::new(tros::transport::PicodataTransport::default())
32//!             .exec(async { reqwest::get("http://example.com").await?.text().await })
33//!             .unwrap()
34//!             .unwrap();
35//! assert!(text.contains("Example Domain"))
36//! ```
37//!
38//! # Cbus
39//!
40//! When you communicate with `tokio`, or third party system thread, or any other runtime,
41//! there are two questions that need to be answered:
42//! 1) how to send data from `picodata` plugin into this runtime?
43//! 2) how to receive data from third party runtime into `picodata` plugin?
44//!
45//! The answer to the first question is pretty straightforward, you can use channels
46//! that are provided by third party runtime.
47//! For example, if you send data into `tokio` runtime,
48//! just use [tokio channels](https://tokio.rs/tokio/tutorial/channels).
49//!
50//! But this solution doesn't help in the second question, because `tokio` channels don't know anything
51//! about `picodata` runtime.
52//! It matters because when you try to receive data in picodata plugin code,
53//! `picodata` runtime should know it and suspend current fiber.
54//! Then wake up it when data is ready.
55//! That's why we need special channels which will allow working in completely
56//! asynchronous mode - `cbus` channels.
57//!
58//! Check [`cbus`](https://github.com/picodata/tarantool-module/blob/master/tarantool/src/cbus/mod.rs)
59//! documentation if you want to know internals.
60//!
61//! Note that `cbus` is more low-level tool than `tros`, so use it may be more complex than
62//! you expect.
63//! We recommend to use `cbus` API for stream data processing or similar tasks.
64//! Otherwise, we recommend trying to use `tros` API due to its simplicity.
65//!
66//! Here we will give a description of the `cbus` channels:
67//! - [`cbus::oneshot::channel`] - provide the oneshot channels, using these channels you can send
68//! data just once.
69//! - [`cbus::unbounded::channel`] - provide unbounded channels, using these channels you can
70//! send data any number of times.
71//! - [`cbus::sync::std::channel`] - provide sync std channels, this channels similar to unbounded
72//! but when you call `send` function from std thread it returns only when `picodata` plugin code
73//! receive data.
74//! - [`cbus::sync::tokio::channel`] - provide sync `tokio` channels, this channels is similar to unbounded one
75//! but when you call `send` function from `tokio` task it returns only when `picodata` plugin code
76//! receive data.
77//!
78//! ## Cbus example
79//!
80//! Let's try to solve a little task - we need to parse multiple files.
81//! Imagine we have files with big json encoded objects in it.
82//! We want to parse files in parallel because the number of files is huge, and we would like to
83//! get the result as quickly as possible.
84//! And we would additionally like to free up the `picodata` runtime from the heavy task of parsing,
85//! because open and parse file is blocking operations that may "lock" `picodata`
86//! runtime for a long time.
87//!
88//! Okay, now let's solve it.
89//! We will use a separate threadpool, to parse file, threads will
90//! receive parsing tasks (file names) and send parsed data back
91//! to `picodata` runtime (using `cbus` unbounded channel).
92//!
93//! First, create a `parse` function.
94//! This function will open a file, parse data, and send a result using an unbounded channel.
95//!
96//! ```no_run
97//! use std::{fs, path::Path};
98//! use picodata_plugin::interplay::cbus;
99//!
100//! fn parse(file: &Path, tx: cbus::unbounded::Sender<serde_json::Value>) {
101//!     let raw_json = fs::read_to_string(file).unwrap();
102//!     let object = serde_json::from_str(&raw_json).unwrap();
103//!     tx.send(object).unwrap();
104//! }
105//! ```
106//!
107//! Second, lets create a function for parse a list of files that may be used in `picodata` runtime.
108//!
109//! ```no_run
110//! # use std::{fs, path::Path};
111//! # fn parse(file: &Path, tx: unbounded::Sender<serde_json::Value>) {
112//! #     let raw_json = fs::read_to_string(file).unwrap();
113//! #     let object = serde_json::from_str(&raw_json).unwrap();
114//! #     tx.send(object).unwrap();
115//! # }
116//! use std::{path::PathBuf, sync::OnceLock};
117//! use picodata_plugin::interplay::channel::unbounded;
118//! use threadpool::ThreadPool;
119//!
120//! static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
121//!
122//! fn parse_files(files: &[PathBuf]) -> Vec<serde_json::Value> {
123//!     let (tx, rx) = unbounded::channel();
124//!     for file in files {
125//!         let file = file.clone();
126//!         let tx = tx.clone();
127//!         THREAD_POOL
128//!             .get_or_init(|| ThreadPool::new(16))
129//!             .execute(move || parse(&file, tx));
130//!     }
131//!
132//!     let mut result = Vec::with_capacity(files.len());
133//!     while let Ok(object) = rx.receive() {
134//!         result.push(object)
135//!     };
136//!     result
137//! }
138//! ```
139
140pub use tarantool::cbus;
141pub use tros;
142
143pub mod channel {
144    /// ***For internal usage***
145    pub const DEFAULT_CBUS_ENDPOINT: &str = "picodata-channels";
146
147    pub mod oneshot {
148        use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
149        pub use tarantool::cbus::oneshot::{EndpointReceiver, Sender};
150
151        /// Creates a new oneshot channel, returning the sender/receiver halves.
152        ///
153        /// See [`tarantool::cbus::oneshot::channel`] for more.
154        pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
155            tarantool::cbus::oneshot::channel(DEFAULT_CBUS_ENDPOINT)
156        }
157    }
158
159    pub mod unbounded {
160        use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
161        pub use tarantool::cbus::unbounded::{EndpointReceiver, Sender};
162
163        /// Creates a new unbounded channel, returning the sender/receiver halves.
164        ///
165        /// See [`tarantool::cbus::unbounded::channel`] for more.
166        pub fn channel<T>() -> (Sender<T>, EndpointReceiver<T>) {
167            tarantool::cbus::unbounded::channel(DEFAULT_CBUS_ENDPOINT)
168        }
169    }
170
171    pub mod sync {
172        pub mod std {
173            use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
174            use std::num::NonZeroUsize;
175            pub use tarantool::cbus::sync::std::{EndpointReceiver, Sender};
176
177            /// Creates a new synchronous channel for communication with std threads,
178            /// returning the sender/receiver halves.
179            ///
180            /// See [`tarantool::cbus::sync::std::channel`] for more.
181            pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
182                tarantool::cbus::sync::std::channel(DEFAULT_CBUS_ENDPOINT, cap)
183            }
184        }
185
186        pub mod tokio {
187            use crate::interplay::channel::DEFAULT_CBUS_ENDPOINT;
188            use std::num::NonZeroUsize;
189            pub use tarantool::cbus::sync::tokio::{EndpointReceiver, Sender};
190
191            /// Creates a new synchronous channel for communication with tokio runtime,
192            /// returning the sender/receiver halves.
193            ///
194            /// See [`tarantool::cbus::sync::tokio::channel`] for more.
195            pub fn channel<T>(cap: NonZeroUsize) -> (Sender<T>, EndpointReceiver<T>) {
196                tarantool::cbus::sync::tokio::channel(DEFAULT_CBUS_ENDPOINT, cap)
197            }
198        }
199    }
200}