pravega_client/
client_factory.rs

1//
2// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10#![allow(bare_trait_objects)]
11
12//! Factory to create components in Pravega Rust client.
13//!
14//! Applications should always use this ClientFactory to initialize components.
15//!
16use crate::byte::reader::ByteReader;
17use crate::byte::writer::ByteWriter;
18use crate::event::reader_group::{ReaderGroup, ReaderGroupConfig, ReaderGroupConfigBuilder};
19use crate::event::transactional_writer::TransactionalEventWriter;
20use crate::event::writer::EventWriter;
21use crate::segment::metadata::SegmentMetadataClient;
22use crate::segment::raw_client::RawClientImpl;
23use crate::segment::reader::AsyncSegmentReaderImpl;
24use crate::sync::synchronizer::Synchronizer;
25use crate::sync::table::{Table, TableError};
26cfg_if::cfg_if! {
27    if #[cfg(feature = "integration-test")] {
28        use crate::test_utils::{RawClientWrapper, SegmentReaderWrapper};
29    }
30}
31
32use crate::index::Fields;
33use pravega_client_auth::DelegationTokenProvider;
34use pravega_client_config::ClientConfig;
35use pravega_client_shared::{DelegationToken, PravegaNodeUri, Scope, ScopedSegment, ScopedStream, WriterId};
36use pravega_connection_pool::connection_pool::ConnectionPool;
37use pravega_controller_client::mock_controller::MockController;
38use pravega_controller_client::{ControllerClient, ControllerClientImpl};
39use pravega_wire_protocol::connection_factory::{
40    ConnectionFactory, ConnectionFactoryConfig, SegmentConnectionManager,
41};
42
43use crate::index::{IndexReader, IndexWriter};
44use crate::util::meta::MetaClient;
45use std::fmt;
46use std::fmt::Debug;
47use std::sync::Arc;
48use tokio::runtime::{Handle, Runtime};
49use tracing::info;
50
51/// Applications should use ClientFactory to create resources they need.
52///
53/// ClientFactory contains a connection pool that is shared by all the readers and writers it creates.
54/// It also contains a tokio runtime that is used to drive async tasks. Spawned tasks in readers and
55/// writers are tied to this runtime.
56///
57/// Note that dropping Runtime in async context is not a good practice and it will have warning messages.
58/// ClientFactory is the only place that's holding the Runtime, so it should not be used in any async contexts.
59/// You can use ['ClientFactoryAsync'] in async contexts instead.
60///
61/// # Examples
62/// ```no_run
63/// use pravega_client_config::ClientConfigBuilder;
64/// use pravega_client::client_factory::ClientFactory;
65///
66/// fn main() {
67///    let config = ClientConfigBuilder::default()
68///         .controller_uri("localhost:8000")
69///         .build()
70///         .expect("create config");
71///     let client_factory = ClientFactory::new(config);
72/// }
73/// ```
74/// ```no_run
75/// use pravega_client_config::ClientConfigBuilder;
76/// use pravega_client::client_factory::ClientFactoryAsync;
77/// use tokio::runtime::Handle;
78///
79/// #[tokio::main]
80/// async fn main() {
81///    let config = ClientConfigBuilder::default()
82///         .controller_uri("localhost:8000")
83///         .build()
84///         .expect("create config");
85///     let handle = Handle::try_current().expect("get current runtime handle");
86///     let client_factory = ClientFactoryAsync::new(config, handle);
87/// }
88/// ```
89/// [`ClientFactoryAsync`]: ClientFactoryAsync
90pub struct ClientFactory {
91    runtime: Runtime,
92    client_factory_async: ClientFactoryAsync,
93}
94
95impl ClientFactory {
96    pub fn new(config: ClientConfig) -> ClientFactory {
97        let rt = tokio::runtime::Runtime::new().expect("create runtime");
98        ClientFactory::new_with_runtime(config, rt)
99    }
100
101    pub fn new_with_runtime(config: ClientConfig, rt: Runtime) -> ClientFactory {
102        let async_factory = ClientFactoryAsync::new(config, rt.handle().clone());
103        ClientFactory {
104            runtime: rt,
105            client_factory_async: async_factory,
106        }
107    }
108
109    pub fn runtime(&self) -> &Runtime {
110        &self.runtime
111    }
112
113    pub fn runtime_handle(&self) -> Handle {
114        self.runtime.handle().clone()
115    }
116
117    pub fn config(&self) -> &ClientConfig {
118        self.client_factory_async.config()
119    }
120
121    pub fn controller_client(&self) -> &dyn ControllerClient {
122        self.client_factory_async.controller_client()
123    }
124
125    pub fn create_event_writer(&self, stream: ScopedStream) -> EventWriter {
126        self.client_factory_async.create_event_writer(stream)
127    }
128
129    pub async fn create_reader_group(&self, reader_group_name: String, stream: ScopedStream) -> ReaderGroup {
130        info!(
131            "Creating reader group {:?} to read data from stream {:?}",
132            reader_group_name, stream
133        );
134        self.client_factory_async
135            .create_reader_group(reader_group_name, stream)
136            .await
137    }
138
139    ///
140    /// Create a Reader Group based on the ReaderGroupConfig.
141    ///
142    pub async fn create_reader_group_with_config(
143        &self,
144        reader_group_name: String,
145        reader_group_config: ReaderGroupConfig,
146        scope: Scope,
147    ) -> ReaderGroup {
148        info!(
149            "Creating reader group {:?} to read data from streams {:?}",
150            reader_group_name,
151            reader_group_config.get_streams()
152        );
153        self.client_factory_async
154            .create_reader_group_with_config(scope, reader_group_name, reader_group_config)
155            .await
156    }
157
158    ///
159    /// Delete a ReaderGroup.
160    ///
161    pub async fn delete_reader_group(
162        &self,
163        scope: Scope,
164        reader_group_name: String,
165    ) -> Result<(), TableError> {
166        info!(
167            "Deleting reader group {:?} under scope {:?}",
168            reader_group_name, scope
169        );
170        self.client_factory_async
171            .delete_reader_group(scope, reader_group_name)
172            .await
173    }
174
175    pub async fn create_transactional_event_writer(
176        &self,
177        stream: ScopedStream,
178        writer_id: WriterId,
179    ) -> TransactionalEventWriter {
180        self.client_factory_async
181            .create_transactional_event_writer(stream, writer_id)
182            .await
183    }
184
185    pub async fn create_byte_writer(&self, stream: ScopedStream) -> ByteWriter {
186        self.client_factory_async.create_byte_writer(stream).await
187    }
188
189    pub async fn create_byte_reader(&self, stream: ScopedStream) -> ByteReader {
190        self.client_factory_async.create_byte_reader(stream).await
191    }
192
193    pub async fn create_index_writer<T: Fields + PartialOrd + PartialEq + Debug>(
194        &self,
195        stream: ScopedStream,
196    ) -> IndexWriter<T> {
197        self.client_factory_async.create_index_writer(stream).await
198    }
199
200    pub async fn create_index_reader(&self, stream: ScopedStream) -> IndexReader {
201        self.client_factory_async.create_index_reader(stream).await
202    }
203
204    pub async fn create_table(&self, scope: Scope, name: String) -> Table {
205        self.client_factory_async.create_table(scope, name).await
206    }
207
208    pub async fn create_synchronizer(&self, scope: Scope, name: String) -> Synchronizer {
209        self.client_factory_async.create_synchronizer(scope, name).await
210    }
211
212    pub fn to_async(&self) -> ClientFactoryAsync {
213        self.client_factory_async.clone()
214    }
215
216    pub(crate) async fn create_async_segment_reader(&self, segment: ScopedSegment) -> AsyncSegmentReaderImpl {
217        self.client_factory_async
218            .create_async_segment_reader(segment)
219            .await
220    }
221
222    pub(crate) async fn create_raw_client(&self, segment: &ScopedSegment) -> RawClientImpl<'_> {
223        self.client_factory_async.create_raw_client(segment).await
224    }
225
226    pub(crate) fn create_raw_client_for_endpoint(&self, endpoint: PravegaNodeUri) -> RawClientImpl<'_> {
227        self.client_factory_async.create_raw_client_for_endpoint(endpoint)
228    }
229
230    pub(crate) async fn create_delegation_token_provider(
231        &self,
232        stream: ScopedStream,
233    ) -> DelegationTokenProvider {
234        self.client_factory_async
235            .create_delegation_token_provider(stream)
236            .await
237    }
238
239    pub(crate) async fn create_segment_metadata_client(
240        &self,
241        segment: ScopedSegment,
242    ) -> SegmentMetadataClient {
243        self.client_factory_async
244            .create_segment_metadata_client(segment)
245            .await
246    }
247
248    #[doc(hidden)]
249    #[cfg(feature = "integration-test")]
250    pub async fn create_raw_client_wrapper(&self, segment: &ScopedSegment) -> RawClientWrapper<'_> {
251        let endpoint = self
252            .client_factory_async
253            .controller_client
254            .get_endpoint_for_segment(segment)
255            .await
256            .expect("get endpoint for segment");
257        RawClientWrapper::new(
258            &self.client_factory_async.connection_pool,
259            endpoint,
260            self.client_factory_async.config.request_timeout,
261        )
262    }
263
264    #[doc(hidden)]
265    #[cfg(feature = "integration-test")]
266    pub async fn create_segment_reader_wrapper(&self, segment: ScopedSegment) -> SegmentReaderWrapper {
267        SegmentReaderWrapper::new(
268            segment.clone(),
269            self.to_async(),
270            self.client_factory_async
271                .create_delegation_token_provider(ScopedStream::from(&segment))
272                .await,
273        )
274        .await
275    }
276}
277
278#[derive(Clone)]
279pub struct ClientFactoryAsync {
280    connection_pool: Arc<ConnectionPool<SegmentConnectionManager>>,
281    controller_client: Arc<Box<dyn ControllerClient>>,
282    config: Arc<ClientConfig>,
283    runtime_handle: Handle,
284}
285
286impl ClientFactoryAsync {
287    pub fn new(config: ClientConfig, handle: Handle) -> Self {
288        let cf = ConnectionFactory::create(ConnectionFactoryConfig::from(&config));
289        let pool = ConnectionPool::new(SegmentConnectionManager::new(cf, config.max_connections_in_pool));
290        let controller = if config.mock {
291            Box::new(MockController::new(config.controller_uri.clone())) as Box<dyn ControllerClient>
292        } else {
293            Box::new(ControllerClientImpl::new(config.clone(), &handle)) as Box<dyn ControllerClient>
294        };
295        ClientFactoryAsync {
296            connection_pool: Arc::new(pool),
297            controller_client: Arc::new(controller),
298            config: Arc::new(config),
299            runtime_handle: handle,
300        }
301    }
302    pub fn config(&self) -> &ClientConfig {
303        &self.config
304    }
305
306    pub fn create_event_writer(&self, stream: ScopedStream) -> EventWriter {
307        EventWriter::new(stream, self.clone())
308    }
309
310    pub async fn create_stream_meta_client(&self, stream: ScopedStream) -> MetaClient {
311        MetaClient::new(stream, self.clone())
312    }
313
314    ///
315    /// Create a ReaderGroup with the specified name to read from the specified Stream.
316    /// The readers will read from the HEAD/beginning of the Stream.
317    ///
318    pub async fn create_reader_group(&self, reader_group_name: String, stream: ScopedStream) -> ReaderGroup {
319        let scope = stream.scope.clone();
320        let rg_config = ReaderGroupConfigBuilder::default().add_stream(stream).build();
321        ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
322    }
323
324    ///
325    /// Create a ReaderGroup with the streams configured in the ReaderGroupConfig.
326    ///
327    pub async fn create_reader_group_with_config(
328        &self,
329        scope: Scope,
330        reader_group_name: String,
331        rg_config: ReaderGroupConfig,
332    ) -> ReaderGroup {
333        ReaderGroup::create(scope, reader_group_name, rg_config, self.clone()).await
334    }
335
336    ///
337    /// Delete a ReaderGroup given for a given scope.
338    ///
339    pub async fn delete_reader_group(
340        &self,
341        scope: Scope,
342        reader_group_name: String,
343    ) -> Result<(), TableError> {
344        ReaderGroup::delete(scope, reader_group_name, self.clone()).await
345    }
346
347    pub async fn create_transactional_event_writer(
348        &self,
349        stream: ScopedStream,
350        writer_id: WriterId,
351    ) -> TransactionalEventWriter {
352        TransactionalEventWriter::new(stream, writer_id, self.clone()).await
353    }
354
355    pub async fn create_byte_writer(&self, stream: ScopedStream) -> ByteWriter {
356        ByteWriter::new(stream, self.clone()).await
357    }
358
359    pub async fn create_byte_reader(&self, stream: ScopedStream) -> ByteReader {
360        ByteReader::new(stream, self.clone(), self.config().reader_wrapper_buffer_size()).await
361    }
362
363    pub async fn create_index_writer<T: Fields + PartialOrd + PartialEq + Debug>(
364        &self,
365        stream: ScopedStream,
366    ) -> IndexWriter<T> {
367        IndexWriter::new(self.clone(), stream).await
368    }
369
370    pub async fn create_index_reader(&self, stream: ScopedStream) -> IndexReader {
371        IndexReader::new(self.clone(), stream).await
372    }
373
374    pub async fn create_table(&self, scope: Scope, name: String) -> Table {
375        Table::new(scope, name, self.clone())
376            .await
377            .expect("Failed to create Table map")
378    }
379
380    pub async fn create_synchronizer(&self, scope: Scope, name: String) -> Synchronizer {
381        Synchronizer::new(scope, name, self.clone()).await
382    }
383
384    pub fn controller_client(&self) -> &dyn ControllerClient {
385        &**self.controller_client
386    }
387
388    pub fn runtime_handle(&self) -> Handle {
389        self.runtime_handle.clone()
390    }
391
392    pub(crate) async fn create_async_segment_reader(&self, segment: ScopedSegment) -> AsyncSegmentReaderImpl {
393        AsyncSegmentReaderImpl::new(
394            segment.clone(),
395            self.clone(),
396            self.create_delegation_token_provider(ScopedStream::from(&segment))
397                .await,
398        )
399        .await
400    }
401
402    pub(crate) async fn create_raw_client(&self, segment: &ScopedSegment) -> RawClientImpl<'_> {
403        let endpoint = self
404            .controller_client
405            .get_endpoint_for_segment(segment)
406            .await
407            .expect("get endpoint for segment");
408        RawClientImpl::new(&self.connection_pool, endpoint, self.config.request_timeout)
409    }
410
411    pub(crate) fn create_raw_client_for_endpoint(&self, endpoint: PravegaNodeUri) -> RawClientImpl<'_> {
412        RawClientImpl::new(&self.connection_pool, endpoint, self.config.request_timeout)
413    }
414
415    pub(crate) async fn create_segment_metadata_client(
416        &self,
417        segment: ScopedSegment,
418    ) -> SegmentMetadataClient {
419        SegmentMetadataClient::new(segment, self.clone()).await
420    }
421
422    pub(crate) async fn create_delegation_token_provider(
423        &self,
424        stream: ScopedStream,
425    ) -> DelegationTokenProvider {
426        let token_provider = DelegationTokenProvider::new(stream);
427        if !self.config.is_auth_enabled {
428            let empty_token = DelegationToken::new("".to_string(), None);
429            token_provider.populate(empty_token).await;
430        }
431        token_provider
432    }
433
434    pub(crate) fn get_connection_pool(&self) -> &ConnectionPool<SegmentConnectionManager> {
435        &self.connection_pool
436    }
437}
438
439impl fmt::Debug for ClientFactoryAsync {
440    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441        f.debug_struct("ClientFactoryInternal")
442            .field("connection pool", &self.connection_pool)
443            .field("client config,", &self.config)
444            .finish()
445    }
446}