1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
//! Communication between `picodata` runtime and any third party runtimes such as `tokio`,
//! `smol` or even your custom runtime.
//! This module provides two API's:
//! - low level `cbus` API - with raw channels between custom runtime and `picodata` runtime
//! - high level `tros` API - with the ability to execute async tasks in third party async runtimes
//! and wait for execution results.
//!
//! You should prefer a second one.
//! The usage of cbus API at one's own risk
//! as there are some pitfalls unbeknownst even to the authors of the runtime.
//!
//! # Problem
//!
//! `Picodata` uses its own asynchronous runtime, and all plugin code is executed via this runtime.
//! You can see more in [documentation](https://docs.binary.picodata.io/tarantool/en/reference/reference_lua/fiber.html).
//! If you want to use any other runtimes, you need a way to communicate
//! with them from `picodata` runtime; that's why `interplay` module exists.
//!
//! # Tros
//!
//! Tros is a high-level bridge between `picodata` and external runtimes.
//! With `tros`, you don't care about the complexity of interaction -
//! you don't even use channels directly.
//! It hides all details and provides a simplified API.
//!
//! Here is how to do async http request with `tros`:
//!
//! ```no_compile
//! use tros;
//! use picodata_plugin::system::tarantool::fiber;
//!
//! let text = tros::TokioExecutor::new(tros::transport::PicodataTransport::default())
//! .exec(async { reqwest::get("http://example.com").await?.text().await })
//! .unwrap()
//! .unwrap();
//! assert!(text.contains("Example Domain"))
//! ```
//!
//! # Cbus
//!
//! When you communicate with `tokio`, or third party system thread, or any other runtime,
//! there are two questions that need to be answered:
//! 1) how to send data from `picodata` plugin into this runtime?
//! 2) how to receive data from third party runtime into `picodata` plugin?
//!
//! The answer to the first question is pretty straightforward, you can use channels
//! that are provided by third party runtime.
//! For example, if you send data into `tokio` runtime,
//! just use [tokio channels](https://tokio.rs/tokio/tutorial/channels).
//!
//! But this solution doesn't help in the second question, because `tokio` channels don't know anything
//! about `picodata` runtime.
//! It matters because when you try to receive data in picodata plugin code,
//! `picodata` runtime should know it and suspend current fiber.
//! Then wake up it when data is ready.
//! That's why we need special channels which will allow working in completely
//! asynchronous mode - `cbus` channels.
//!
//! Check [`cbus`](https://github.com/picodata/tarantool-module/blob/master/tarantool/src/cbus/mod.rs)
//! documentation if you want to know internals.
//!
//! Note that `cbus` is more low-level tool than `tros`, so use it may be more complex than
//! you expect.
//! We recommend to use `cbus` API for stream data processing or similar tasks.
//! Otherwise, we recommend trying to use `tros` API due to its simplicity.
//!
//! Here we will give a description of the `cbus` channels:
//! - [`cbus::oneshot::channel`] - provide the oneshot channels, using these channels you can send
//! data just once.
//! - [`cbus::unbounded::channel`] - provide unbounded channels, using these channels you can
//! send data any number of times.
//! - [`cbus::sync::std::channel`] - provide sync std channels, this channels similar to unbounded
//! but when you call `send` function from std thread it returns only when `picodata` plugin code
//! receive data.
//! - [`cbus::sync::tokio::channel`] - provide sync `tokio` channels, this channels is similar to unbounded one
//! but when you call `send` function from `tokio` task it returns only when `picodata` plugin code
//! receive data.
//!
//! ## Cbus example
//!
//! Let's try to solve a little task - we need to parse multiple files.
//! Imagine we have files with big json encoded objects in it.
//! We want to parse files in parallel because the number of files is huge, and we would like to
//! get the result as quickly as possible.
//! And we would additionally like to free up the `picodata` runtime from the heavy task of parsing,
//! because open and parse file is blocking operations that may "lock" `picodata`
//! runtime for a long time.
//!
//! Okay, now let's solve it.
//! We will use a separate threadpool, to parse file, threads will
//! receive parsing tasks (file names) and send parsed data back
//! to `picodata` runtime (using `cbus` unbounded channel).
//!
//! First, create a `parse` function.
//! This function will open a file, parse data, and send a result using an unbounded channel.
//!
//! ```no_run
//! use std::{fs, path::Path};
//! use picodata_plugin::interplay::cbus;
//!
//! fn parse(file: &Path, tx: cbus::unbounded::Sender<serde_json::Value>) {
//! let raw_json = fs::read_to_string(file).unwrap();
//! let object = serde_json::from_str(&raw_json).unwrap();
//! tx.send(object).unwrap();
//! }
//! ```
//!
//! Second, lets create a function for parse a list of files that may be used in `picodata` runtime.
//!
//! ```no_run
//! # use std::{fs, path::Path};
//! # fn parse(file: &Path, tx: unbounded::Sender<serde_json::Value>) {
//! # let raw_json = fs::read_to_string(file).unwrap();
//! # let object = serde_json::from_str(&raw_json).unwrap();
//! # tx.send(object).unwrap();
//! # }
//! use std::{path::PathBuf, sync::OnceLock};
//! use picodata_plugin::interplay::channel::unbounded;
//! use threadpool::ThreadPool;
//!
//! static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
//!
//! fn parse_files(files: &[PathBuf]) -> Vec<serde_json::Value> {
//! let (tx, rx) = unbounded::channel();
//! for file in files {
//! let file = file.clone();
//! let tx = tx.clone();
//! THREAD_POOL
//! .get_or_init(|| ThreadPool::new(16))
//! .execute(move || parse(&file, tx));
//! }
//!
//! let mut result = Vec::with_capacity(files.len());
//! while let Ok(object) = rx.receive() {
//! result.push(object)
//! };
//! result
//! }
//! ```
pub use cbus;