tarantool_rs/client/stream.rs
1use std::fmt;
2
3use async_trait::async_trait;
4
5use rmpv::Value;
6
7use super::{Connection, Transaction, TransactionBuilder};
8use crate::{codec::request::EncodedRequest, Executor, Result};
9
10/// Abstraction, providing sequential processing of requests.
11///
12/// With streams there is a guarantee that the server instance will not handle the next request in a stream until it has completed the previous one ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#binary-protocol-streams)).
13///
14/// # Example
15///
16/// ```rust,compile
17/// use tarantool_rs::{Connection, Executor, ExecutorExt};
18/// # use futures::FutureExt;
19/// # use rmpv::Value;
20///
21/// # async fn async_wrapper() {
22/// let connection = Connection::builder().build("localhost:3301").await.unwrap();
23///
24/// // This will print 'fast' and then 'slow'
25/// let eval_slow_fut = connection
26/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
27/// .inspect(|res| println!("{:?}", res));
28/// let eval_fast_fut = connection
29/// .eval("return ...;", ("fast", ))
30/// .inspect(|res| println!("{:?}", res));
31/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
32///
33/// // This will print 'slow' and then 'fast', since slow request was created first and have smaller sync
34/// let stream = connection.stream();
35/// let eval_slow_fut = stream
36/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
37/// .inspect(|res| println!("{:?}", res));
38/// let eval_fast_fut = stream
39/// .eval("return ...;", ("fast", ))
40/// .inspect(|res| println!("{:?}", res));
41/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
42/// # }
43/// ```
44
45#[derive(Clone)]
46pub struct Stream {
47 conn: Connection,
48 stream_id: u32,
49}
50
51// TODO: convert stream to transaction and back
52impl Stream {
53 pub(crate) fn new(conn: Connection) -> Self {
54 let stream_id = conn.next_stream_id();
55 Self { conn, stream_id }
56 }
57}
58
59#[async_trait]
60impl Executor for Stream {
61 async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result<Value> {
62 request.stream_id = Some(self.stream_id);
63 self.conn.send_encoded_request(request).await
64 }
65
66 fn stream(&self) -> Stream {
67 self.conn.stream()
68 }
69
70 fn transaction_builder(&self) -> TransactionBuilder {
71 self.conn.transaction_builder()
72 }
73
74 async fn transaction(&self) -> Result<Transaction> {
75 self.conn.transaction().await
76 }
77
78 async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
79 self.conn.get_cached_sql_statement_id(statement).await
80 }
81}
82
83impl fmt::Debug for Stream {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("Stream")
86 .field("stream_id", &self.stream_id)
87 .finish()
88 }
89}