pravega_client/byte/
reader.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10
11use crate::client_factory::ClientFactoryAsync;
12use crate::segment::metadata::SegmentMetadataClient;
13use crate::segment::reader::PrefetchingAsyncSegmentReader;
14
15use pravega_client_shared::{ScopedSegment, ScopedStream};
16
17use std::convert::TryInto;
18use std::io::{Error, ErrorKind, SeekFrom};
19use std::sync::Arc;
20use uuid::Uuid;
21
22/// A ByteReader enables reading raw bytes from a segment.
23///
24/// The ByteReader provides an API similar to the [`Read`] and [`Seek`] traits in the standard library,
25/// but where the methods are asynchronous.
26///
27/// Internally ByteReader uses a prefetching reader that prefetches data from the server in the background.
28/// The prefetched data is cached in memory so any sequential reads should be able to hit the cache.
29///
30/// Any seek operation will invalidate the cache and causes cache miss, so frequent seek and read operations
31/// might not have good performance.
32///
33/// You can also wrap ByteReader with [`BufReader`], but doing so will not increase performance further.
34///
35/// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
36/// [`Seek`]: https://doc.rust-lang.org/stable/std/io/trait.Seek.html
37/// [`BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html
38///
39/// # Examples
40/// ```no_run
41/// use pravega_client_config::ClientConfigBuilder;
42/// use pravega_client::client_factory::ClientFactoryAsync;
43/// use pravega_client_shared::ScopedStream;
44/// use tokio::runtime::Handle;
45///
46/// #[tokio::main]
47/// async fn main() {
48///     // assuming Pravega controller is running at endpoint `localhost:9090`
49///     let config = ClientConfigBuilder::default()
50///         .controller_uri("localhost:9090")
51///         .build()
52///         .expect("creating config");
53///
54///     let handle = Handle::current();
55///     let client_factory = ClientFactoryAsync::new(config, handle);
56///
57///     // assuming scope:myscope, stream:mystream exist.
58///     // notice that this stream should be a fixed sized single segment stream
59///     let stream = ScopedStream::from("myscope/mystream");
60///
61///     let mut byte_reader = client_factory.create_byte_reader(stream).await;
62///     let mut buf: Vec<u8> = vec![0; 4];
63///     let size = byte_reader.read(&mut buf).await.expect("read from byte stream");
64/// }
65/// ```
66pub struct ByteReader {
67    reader_id: Uuid,
68    pub segment: ScopedSegment,
69    reader: Option<PrefetchingAsyncSegmentReader>,
70    reader_buffer_size: usize,
71    metadata_client: SegmentMetadataClient,
72    factory: ClientFactoryAsync,
73}
74
75impl ByteReader {
76    pub(crate) async fn new(stream: ScopedStream, factory: ClientFactoryAsync, buffer_size: usize) -> Self {
77        let segments = factory
78            .controller_client()
79            .get_head_segments(&stream)
80            .await
81            .expect("get head segments");
82        assert_eq!(
83            segments.len(),
84            1,
85            "Byte stream is configured with more than one segment"
86        );
87        let segment = segments.iter().next().unwrap().0.clone();
88        let scoped_segment = ScopedSegment {
89            scope: stream.scope.clone(),
90            stream: stream.stream.clone(),
91            segment,
92        };
93        let async_reader = factory.create_async_segment_reader(scoped_segment.clone()).await;
94        let async_reader_wrapper = PrefetchingAsyncSegmentReader::new(
95            factory.runtime_handle(),
96            Arc::new(Box::new(async_reader)),
97            0,
98            buffer_size,
99        );
100        let metadata_client = factory
101            .create_segment_metadata_client(scoped_segment.clone())
102            .await;
103        ByteReader {
104            reader_id: Uuid::new_v4(),
105            segment: scoped_segment,
106            reader: Some(async_reader_wrapper),
107            reader_buffer_size: buffer_size,
108            metadata_client,
109            factory,
110        }
111    }
112
113    /// Read data asynchronously.
114    ///
115    /// ```ignore
116    /// let mut byte_reader = client_factory.create_byte_reader(segment).await;
117    /// let mut buf: Vec<u8> = vec![0; 4];
118    /// let size = byte_reader.read(&mut buf).expect("read");
119    /// ```
120    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
121        self.reader
122            .as_mut()
123            .unwrap()
124            .read(buf)
125            .await
126            .map_err(|e| Error::new(ErrorKind::Other, format!("Error: {:?}", e)))
127    }
128
129    /// Return the head of current readable data in the segment asynchronously.
130    ///
131    /// The ByteReader is initialized to read from the segment at offset 0. However, it might
132    /// encounter the SegmentIsTruncated error due to the segment has been truncated. In this case,
133    /// application should call this method to get the current readable head and read from it.
134    /// ```ignore
135    /// let mut byte_reader = client_factory.create_byte_reader_async(segment).await;
136    /// let offset = byte_reader.current_head().await.expect("get current head offset");
137    /// ```
138    pub async fn current_head(&self) -> std::io::Result<u64> {
139        self.metadata_client
140            .fetch_current_starting_head()
141            .await
142            .map(|i| i as u64)
143            .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
144    }
145
146    /// Return the tail offset of the segment asynchronously.
147    ///
148    /// ```ignore
149    /// let mut byte_reader = client_factory.create_byte_reader_async(segment).await;
150    /// let offset = byte_reader.current_tail().await.expect("get current tail offset");
151    /// ```
152    pub async fn current_tail(&self) -> std::io::Result<u64> {
153        self.metadata_client
154            .fetch_current_segment_length()
155            .await
156            .map(|i| i as u64)
157            .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))
158    }
159
160    /// Return the current read offset.
161    ///
162    /// ```ignore
163    /// let mut byte_reader = client_factory.create_byte_reader(segment);
164    /// let offset = byte_reader.current_offset();
165    /// ```
166    pub fn current_offset(&self) -> u64 {
167        self.reader.as_ref().unwrap().offset as u64
168    }
169
170    /// Return the bytes that are available to read instantly without fetching from server.
171    ///
172    /// ByteReader has a buffer internally. This method returns the size of remaining data in that buffer.
173    /// ```ignore
174    /// let mut byte_reader = client_factory.create_byte_reader(segment);
175    /// let size = byte_reader.available();
176    /// ```
177    pub fn available(&self) -> usize {
178        self.reader.as_ref().unwrap().available()
179    }
180
181    /// The seek method for ByteReader allows seeking to a byte offset from the beginning
182    /// of the stream or a byte offset relative to the current position in the stream.
183    /// If the stream has been truncated, the byte offset will be relative to the original beginning of the stream.
184    pub async fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
185        match pos {
186            SeekFrom::Start(offset) => {
187                let offset = offset.try_into().map_err(|e| {
188                    Error::new(
189                        ErrorKind::InvalidInput,
190                        format!("Overflowed when converting offset to i64: {:?}", e),
191                    )
192                })?;
193                self.recreate_reader_wrapper(offset);
194                Ok(offset as u64)
195            }
196            SeekFrom::Current(offset) => {
197                let new_offset = self.reader.as_ref().unwrap().offset + offset;
198                if new_offset < 0 {
199                    Err(Error::new(
200                        ErrorKind::InvalidInput,
201                        "Cannot seek to a negative offset",
202                    ))
203                } else {
204                    self.recreate_reader_wrapper(new_offset);
205                    Ok(new_offset as u64)
206                }
207            }
208            SeekFrom::End(offset) => {
209                let tail = self
210                    .metadata_client
211                    .fetch_current_segment_length()
212                    .await
213                    .map_err(|e| Error::new(ErrorKind::Other, format!("{:?}", e)))?;
214                if tail + offset < 0 {
215                    Err(Error::new(
216                        ErrorKind::InvalidInput,
217                        "Cannot seek to a negative offset",
218                    ))
219                } else {
220                    let new_offset = tail + offset;
221                    self.recreate_reader_wrapper(new_offset);
222                    Ok(new_offset as u64)
223                }
224            }
225        }
226    }
227
228    fn recreate_reader_wrapper(&mut self, offset: i64) {
229        let internal_reader = self.reader.take().unwrap().extract_reader();
230        let new_reader_wrapper = PrefetchingAsyncSegmentReader::new(
231            self.factory.runtime_handle(),
232            internal_reader,
233            offset,
234            self.reader_buffer_size,
235        );
236        self.reader = Some(new_reader_wrapper);
237    }
238}
239
240#[cfg(test)]
241mod test {
242    use super::*;
243    use crate::byte::writer::ByteWriter;
244    use crate::client_factory::ClientFactory;
245    use crate::util::create_stream;
246    use pravega_client_config::connection_type::{ConnectionType, MockType};
247    use pravega_client_config::ClientConfigBuilder;
248    use pravega_client_shared::PravegaNodeUri;
249    use tokio::runtime::Runtime;
250
251    #[test]
252    fn test_byte_seek() {
253        let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
254        let rt = factory.runtime();
255        // write 200 bytes
256        let payload = vec![1; 200];
257        rt.block_on(writer.write(&payload)).expect("write");
258        rt.block_on(writer.flush()).expect("flush");
259
260        // read 200 bytes from beginning
261        let mut buf = vec![0; 200];
262        let mut read = 0;
263        while read != 200 {
264            let r = rt.block_on(reader.read(&mut buf)).expect("read");
265            read += r;
266        }
267        assert_eq!(read, 200);
268        assert_eq!(buf, vec![1; 200]);
269        // seek to head
270        rt.block_on(reader.seek(SeekFrom::Start(0)))
271            .expect("seek to head");
272        assert_eq!(reader.current_offset(), 0);
273
274        // seek to head with positive offset
275        rt.block_on(reader.seek(SeekFrom::Start(100)))
276            .expect("seek to head");
277        assert_eq!(reader.current_offset(), 100);
278
279        // seek to current with positive offset
280        assert_eq!(reader.current_offset(), 100);
281        rt.block_on(reader.seek(SeekFrom::Current(100)))
282            .expect("seek to current");
283        assert_eq!(reader.current_offset(), 200);
284
285        // seek to current with negative offset
286        rt.block_on(reader.seek(SeekFrom::Current(-100)))
287            .expect("seek to current");
288        assert_eq!(reader.current_offset(), 100);
289
290        // seek to current invalid negative offset
291        assert!(rt.block_on(reader.seek(SeekFrom::Current(-200))).is_err());
292
293        // seek to end
294        rt.block_on(reader.seek(SeekFrom::End(0))).expect("seek to end");
295        assert_eq!(reader.current_offset(), 200);
296
297        // seek to end with positive offset
298        assert!(rt.block_on(reader.seek(SeekFrom::End(1))).is_ok());
299
300        // seek to end with negative offset
301        rt.block_on(reader.seek(SeekFrom::End(-100)))
302            .expect("seek to end");
303        assert_eq!(reader.current_offset(), 100);
304
305        // seek to end with invalid negative offset
306        assert!(rt.block_on(reader.seek(SeekFrom::End(-300))).is_err());
307    }
308
309    #[test]
310    fn test_byte_stream_truncate() {
311        let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
312        let rt = factory.runtime();
313        // write 200 bytes
314        let payload = vec![1; 200];
315        rt.block_on(writer.write(&payload)).expect("write");
316        rt.block_on(writer.flush()).expect("flush");
317
318        // truncate to offset 100
319        rt.block_on(writer.truncate_data_before(100)).expect("truncate");
320
321        // read truncated offset
322        rt.block_on(reader.seek(SeekFrom::Start(0)))
323            .expect("seek to head");
324        let mut buf = vec![0; 100];
325        assert!(rt.block_on(reader.read(&mut buf)).is_err());
326
327        // read from current head
328        let offset = rt.block_on(reader.current_head()).expect("get current head");
329        rt.block_on(reader.seek(SeekFrom::Start(offset)))
330            .expect("seek to new head");
331        let mut buf = vec![0; 100];
332        assert!(rt.block_on(reader.read(&mut buf)).is_ok());
333        assert_eq!(buf, vec![1; 100]);
334    }
335
336    #[test]
337    fn test_byte_stream_seal() {
338        const BYTE_SIZE: usize = 200;
339
340        let (mut writer, mut reader, factory) = create_reader_and_writer(Runtime::new().unwrap());
341        let rt = factory.runtime();
342
343        // write 200 bytes
344        let payload = vec![1; BYTE_SIZE];
345        rt.block_on(writer.write(&payload)).expect("write");
346        rt.block_on(writer.flush()).expect("flush");
347
348        // seal the segment
349        rt.block_on(writer.seal()).expect("seal");
350
351        // read sealed stream
352        rt.block_on(reader.seek(SeekFrom::Start(0)))
353            .expect("seek to new head");
354        let mut buf = vec![0; BYTE_SIZE];
355        assert!(rt.block_on(reader.read(&mut buf)).is_ok());
356        assert_eq!(buf, vec![1; BYTE_SIZE]);
357
358        let payload = vec![1; BYTE_SIZE];
359        let write_result = rt.block_on(writer.write(&payload));
360        let flush_result = rt.block_on(writer.flush());
361        assert!(write_result.is_err() || flush_result.is_err());
362    }
363
364    #[test]
365    #[should_panic(expected = "Byte stream is configured with more than one segment")]
366    fn test_invalid_stream_config() {
367        let config = ClientConfigBuilder::default()
368            .connection_type(ConnectionType::Mock(MockType::Happy))
369            .mock(true)
370            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
371            .build()
372            .unwrap();
373        let factory = ClientFactory::new(config);
374        factory.runtime().block_on(create_stream(
375            &factory,
376            "testScopeInvalid",
377            "testStreamInvalid",
378            2,
379        ));
380        let stream = ScopedStream::from("testScopeInvalid/testStreamInvalid");
381        factory.runtime().block_on(factory.create_byte_reader(stream));
382    }
383
384    fn create_reader_and_writer(runtime: Runtime) -> (ByteWriter, ByteReader, ClientFactory) {
385        let config = ClientConfigBuilder::default()
386            .connection_type(ConnectionType::Mock(MockType::Happy))
387            .mock(true)
388            .controller_uri(PravegaNodeUri::from("127.0.0.2:9091".to_string()))
389            .build()
390            .unwrap();
391        let factory = ClientFactory::new_with_runtime(config, runtime);
392        factory
393            .runtime()
394            .block_on(create_stream(&factory, "testScope", "testStream", 1));
395        let stream = ScopedStream::from("testScope/testStream");
396        let writer = factory
397            .runtime()
398            .block_on(factory.create_byte_writer(stream.clone()));
399        let reader = factory.runtime().block_on(factory.create_byte_reader(stream));
400        (writer, reader, factory)
401    }
402}