Expand description
Communication between picodata runtime and any third party runtimes such as tokio,
smol or even your custom runtime.
This module provides two API’s:
- low level
cbusAPI - with raw channels between custom runtime andpicodataruntime - high level
trosAPI - with the ability to execute async tasks in third party async runtimes and wait for execution results.
You should prefer a second one. The usage of cbus API at one’s own risk as there are some pitfalls unbeknownst even to the authors of the runtime.
§Problem
Picodata uses its own asynchronous runtime, and all plugin code is executed via this runtime.
You can see more in documentation.
If you want to use any other runtimes, you need a way to communicate
with them from picodata runtime; that’s why interplay module exists.
§Tros
Tros is a high-level bridge between picodata and external runtimes.
With tros, you don’t care about the complexity of interaction -
you don’t even use channels directly.
It hides all details and provides a simplified API.
Here is how to do async http request with tros:
use picodata_plugin::interplay::tros;
use picodata_plugin::system::tarantool::fiber;
let text = tros::TokioExecutor::new(tros::transport::PicodataTransport::default())
.exec(async { reqwest::get("http://example.com").await?.text().await })
.unwrap()
.unwrap();
assert!(text.contains("Example Domain"))§Cbus
When you communicate with tokio, or third party system thread, or any other runtime,
there are two questions that need to be answered:
- how to send data from
picodataplugin into this runtime? - how to receive data from third party runtime into
picodataplugin?
The answer to the first question is pretty straightforward, you can use channels
that are provided by third party runtime.
For example, if you send data into tokio runtime,
just use tokio channels.
But this solution doesn’t help in the second question, because tokio channels don’t know anything
about picodata runtime.
It matters because when you try to receive data in picodata plugin code,
picodata runtime should know it and suspend current fiber.
Then wake up it when data is ready.
That’s why we need special channels which will allow working in completely
asynchronous mode - cbus channels.
Check cbus
documentation if you want to know internals.
Note that cbus is more low-level tool than tros, so use it may be more complex than
you expect.
We recommend to use cbus API for stream data processing or similar tasks.
Otherwise, we recommend trying to use tros API due to its simplicity.
Here we will give a description of the cbus channels:
cbus::oneshot::channel- provide the oneshot channels, using these channels you can send data just once.cbus::unbounded::channel- provide unbounded channels, using these channels you can send data any number of times.cbus::sync::std::channel- provide sync std channels, this channels similar to unbounded but when you callsendfunction from std thread it returns only whenpicodataplugin code receive data.cbus::sync::tokio::channel- provide synctokiochannels, this channels is similar to unbounded one but when you callsendfunction fromtokiotask it returns only whenpicodataplugin code receive data.
§Cbus example
Let’s try to solve a little task - we need to parse multiple files.
Imagine we have files with big json encoded objects in it.
We want to parse files in parallel because the number of files is huge, and we would like to
get the result as quickly as possible.
And we would additionally like to free up the picodata runtime from the heavy task of parsing,
because open and parse file is blocking operations that may “lock” picodata
runtime for a long time.
Okay, now let’s solve it.
We will use a separate threadpool, to parse file, threads will
receive parsing tasks (file names) and send parsed data back
to picodata runtime (using cbus unbounded channel).
First, create a parse function.
This function will open a file, parse data, and send a result using an unbounded channel.
use std::{fs, path::Path};
use picodata_plugin::interplay::cbus;
fn parse(file: &Path, tx: cbus::unbounded::Sender<serde_json::Value>) {
let raw_json = fs::read_to_string(file).unwrap();
let object = serde_json::from_str(&raw_json).unwrap();
tx.send(object).unwrap();
}Second, lets create a function for parse a list of files that may be used in picodata runtime.
use std::{path::PathBuf, sync::OnceLock};
use picodata_plugin::interplay::channel::unbounded;
use threadpool::ThreadPool;
static THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
fn parse_files(files: &[PathBuf]) -> Vec<serde_json::Value> {
let (tx, rx) = unbounded::channel();
for file in files {
let file = file.clone();
let tx = tx.clone();
THREAD_POOL
.get_or_init(|| ThreadPool::new(16))
.execute(move || parse(&file, tx));
}
let mut result = Vec::with_capacity(files.len());
while let Ok(object) = rx.receive() {
result.push(object)
};
result
}Re-exports§
pub use tros;