Skip to main content

couchbase_core/memdx/
op_bootstrap.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use tokio::select;
20use tokio::time::{sleep, Instant};
21use tracing::warn;
22
23use crate::memdx::dispatcher::Dispatcher;
24use crate::memdx::error::CancellationErrorKind;
25use crate::memdx::error::Result;
26use crate::memdx::op_auth_saslauto::{OpSASLAutoEncoder, OpsSASLAuthAuto, SASLAuthAutoOptions};
27use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder;
28use crate::memdx::pendingop::{run_op_future_with_deadline, StandardPendingOp};
29use crate::memdx::request::{
30    GetClusterConfigRequest, GetErrorMapRequest, HelloRequest, SASLAuthRequest,
31    SASLListMechsRequest, SASLStepRequest, SelectBucketRequest,
32};
33use crate::memdx::response::{
34    BootstrapResult, GetClusterConfigResponse, GetErrorMapResponse, HelloResponse,
35    SASLAuthResponse, SASLListMechsResponse, SASLStepResponse, SelectBucketResponse,
36};
37
38pub trait OpBootstrapEncoder {
39    fn hello<D>(
40        &self,
41        dispatcher: &D,
42        request: HelloRequest,
43    ) -> impl std::future::Future<Output = Result<StandardPendingOp<HelloResponse>>>
44    where
45        D: Dispatcher;
46
47    fn get_error_map<D>(
48        &self,
49        dispatcher: &D,
50        request: GetErrorMapRequest,
51    ) -> impl std::future::Future<Output = Result<StandardPendingOp<GetErrorMapResponse>>>
52    where
53        D: Dispatcher;
54
55    fn select_bucket<D>(
56        &self,
57        dispatcher: &D,
58        request: SelectBucketRequest,
59    ) -> impl std::future::Future<Output = Result<StandardPendingOp<SelectBucketResponse>>>
60    where
61        D: Dispatcher;
62
63    fn get_cluster_config<D>(
64        &self,
65        dispatcher: &D,
66        request: GetClusterConfigRequest,
67    ) -> impl std::future::Future<Output = Result<StandardPendingOp<GetClusterConfigResponse>>>
68    where
69        D: Dispatcher;
70}
71
72pub struct OpBootstrap {}
73
74#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
75pub struct BootstrapOptions {
76    pub hello: Option<HelloRequest>,
77    pub get_error_map: Option<GetErrorMapRequest>,
78    pub auth: Option<SASLAuthAutoOptions>,
79    pub select_bucket: Option<SelectBucketRequest>,
80    pub deadline: Instant,
81    pub get_cluster_config: Option<GetClusterConfigRequest>,
82}
83
84impl OpBootstrap {
85    // bootstrap is currently not pipelined. SCRAM, and the general retry behaviour within sasl auto,
86    // make pipelining complex. It's a bit of a niche optimization so we can improve later.
87    pub async fn bootstrap<E, D>(
88        encoder: E,
89        dispatcher: &D,
90        opts: BootstrapOptions,
91    ) -> Result<BootstrapResult>
92    where
93        E: OpBootstrapEncoder + OpSASLAutoEncoder,
94        D: Dispatcher,
95    {
96        let mut result = BootstrapResult {
97            hello: None,
98            error_map: None,
99            cluster_config: None,
100        };
101
102        if let Some(req) = opts.hello {
103            result.hello =
104                match run_op_future_with_deadline(opts.deadline, encoder.hello(dispatcher, req))
105                    .await
106                {
107                    Ok(r) => Some(r),
108                    Err(e) => {
109                        warn!("Hello failed {e}");
110                        None
111                    }
112                };
113        };
114
115        if let Some(req) = opts.get_error_map {
116            result.error_map = match run_op_future_with_deadline(
117                opts.deadline,
118                encoder.get_error_map(dispatcher, req),
119            )
120            .await
121            {
122                Ok(r) => Some(r),
123                Err(e) => {
124                    warn!("Get error map failed {e}");
125                    None
126                }
127            };
128        };
129        if let Some(req) = opts.auth {
130            let op_auto = OpsSASLAuthAuto {};
131            match op_auto
132                .sasl_auth_auto(&encoder, dispatcher, opts.deadline, req)
133                .await
134            {
135                Ok(_) => {}
136                Err(e) => {
137                    warn!("Auth failed {e}");
138                    return Err(e);
139                }
140            };
141        }
142
143        if let Some(req) = opts.select_bucket {
144            match run_op_future_with_deadline(opts.deadline, encoder.select_bucket(dispatcher, req))
145                .await
146            {
147                Ok(r) => Some(r),
148                Err(e) => {
149                    warn!("Select bucket failed {e}");
150                    return Err(e);
151                }
152            };
153        }
154
155        if let Some(req) = opts.get_cluster_config {
156            result.cluster_config = match run_op_future_with_deadline(
157                opts.deadline,
158                encoder.get_cluster_config(dispatcher, req),
159            )
160            .await
161            {
162                Ok(r) => Some(r),
163                Err(e) => {
164                    warn!("Get cluster config failed {e}");
165                    None
166                }
167            }
168        };
169
170        Ok(result)
171    }
172}