1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Native rust streams for streaming searches
//!
use futures::Stream;
use ldap3::{LdapError, LdapResult, SearchEntry, SearchStream, StreamState};
use tokio::{runtime::Handle, task::block_in_place};
use tracing::{Level, debug, error, instrument, warn};
use crate::{Error, Record};
/// This wrapper exists solely for the purpose of running some cleanup in `drop()`.
///
/// This should be refactored to implement `AsyncDrop` when it gets stabilized:
/// https://github.com/rust-lang/rust/issues/126482
struct StreamDropWrapper<'a, S, A>
where
S: AsRef<str> + Send + Sync + 'a,
A: AsRef<[S]> + Send + Sync + 'a,
{
pub search_stream: SearchStream<'a, S, A>,
}
impl<'a, S, A> Drop for StreamDropWrapper<'a, S, A>
where
S: AsRef<str> + Send + Sync + 'a,
A: AsRef<[S]> + Send + Sync + 'a,
{
/// Anyway nowadays we do most cleanup in the stream end, so this does nothing.
/// The risk still exists if the stream is dropped mid way though.
///
/// In this case there might exist a possibility of a futurelock too.
fn drop(&mut self) {
match self.search_stream.state() {
// Avoiding the block if this stream has already been cleaned up.
StreamState::Closed | StreamState::Error => (),
StreamState::Fresh | StreamState::Active | StreamState::Done => {
// Making this blocking call in drop is suboptimal.
// We should use async-drop, when it's stabilized:
// https://github.com/rust-lang/rust/issues/126482
//
// Previously we used `futures::block_on()` but that ran the risk of deadlocks
// (not entirely clear why). The client object is already guaranteed to be in a tokio runtime,
// and so the lifetime `'a` also guarantees that we are now in a tokio managed thread.
// Thus we can run some async code with block_in_place().
// This does necessitate that we're in a multithread executor though.
warn!("Dropping a stream mid way. Performing blocking cleanup in drop().");
let result = block_in_place(|| {
Handle::current().block_on(async move { self.cleanup().await })
});
match result {
Ok(()) => (),
Err(ldap_err) => {
// Cannot return the error from drop but at least we can log it.
error!("Error in finishing the stream: {ldap_err}")
}
}
}
}
}
}
impl<'a, S, A> StreamDropWrapper<'a, S, A>
where
S: AsRef<str> + Send + Sync + 'a,
A: AsRef<[S]> + Send + Sync + 'a,
{
///
/// Cleanup the stream. This method should be called when dropping the stream.
///
/// This method will cleanup the stream and close the connection.
///
///
/// # Error state
///
/// If the stream is already in an error state this won't do anything.
/// An Ok result is returned.
#[instrument(level = Level::TRACE, skip_all)]
async fn cleanup(&mut self) -> Result<(), LdapError> {
match self.search_stream.state() {
// `cleanup()` already called when running it to the end.
// Nothing to here anymore
StreamState::Closed => Ok(()),
// Stream ended but not yet closed.
// Doing it here.
StreamState::Done => finish_stream(&mut self.search_stream).await,
StreamState::Error => {
error!(
"Stream is in Error state. Not trying to cancel it as it could do more harm than good."
);
// We don't have an LdapError to return here.
Ok(())
}
StreamState::Fresh | StreamState::Active => {
debug!("Stream is still open. Issuing cancellation to the server.");
// Let's first call finish according to the docs.
// This probably wont do too much but it gives the adapters a chance for some cleanup.
finish_stream(&mut self.search_stream).await?;
// Then the actual cancellation.
let msgid = self.search_stream.ldap_handle().last_id();
self.search_stream.ldap_handle().abandon(msgid).await
}
}
}
}
/// Just a DRY helper for calling `finish()` on the stream.
#[instrument(level = Level::TRACE, skip_all, ret)]
async fn finish_stream<'a, S, A>(stream: &mut SearchStream<'a, S, A>) -> Result<(), LdapError>
where
S: AsRef<str> + Send + Sync + 'a,
A: AsRef<[S]> + Send + Sync + 'a,
{
// Calling this might not be strictly necessary,
// but it's probably expected so let's just do it.
// I don't think this does any networking most of the time.
let finish_result = stream.finish().await;
match finish_result.success() {
Ok(_) => Ok(()), // All good.
// This is returned if the stream is cancelled in the middle.
// Which is fine for us.
// https://ldap.com/ldap-result-code-reference-client-side-result-codes/#rc-userCanceled
Err(LdapError::LdapResult {
result: LdapResult { rc: 88, .. },
}) => Ok(()),
Err(finish_err) => Err(finish_err),
}
}
/// A helper to create native rust streams out of `ldap3::SearchStream`s.
pub(crate) fn to_native_stream<'a, S, A>(
ldap3_stream: SearchStream<'a, S, A>,
) -> Result<impl Stream<Item = Result<Record, Error>> + 'a + use<'a, S, A>, Error>
where
S: AsRef<str> + Send + Sync + 'a,
A: AsRef<[S]> + Send + Sync + 'a,
{
// This will handle stream cleanup.
let stream_wrapper = StreamDropWrapper {
search_stream: ldap3_stream,
};
// Produce the steam itself by unfolding.
let stream = futures::stream::try_unfold(stream_wrapper, async |mut search| {
match search.search_stream.next().await {
// In the middle of the stream. Produce the next result.
Ok(Some(result_entry)) => Ok(Some((
Record {
search_entry: SearchEntry::construct(result_entry),
},
search,
))),
// Stream is done.
Ok(None) => {
// Performing the cleanup here before yielding the end of the stream.
// This is nice place for this as we're already in an async context.
// The alternative is to block on this in `drop()`.
// That still has to be called because streams may be dropped mid way too,
// but running them to completion is assumed to be the common case.
//
// Actually we cannot call `self.cleanup()` here because that will send
// unnecessary search abandon if the stream had no adaptors:
// https://github.com/inejge/ldap3/issues/155
//
// Just finishing is okay though.
let cleanup_result = finish_stream(&mut search.search_stream).await;
// Doing the cleanup here (as opposed to drop) also has the advantage that we can
// return the potential error.
match cleanup_result {
Ok(()) => Ok(None),
Err(ldap_err) => Err(Error::Query(
String::from("Error finishing the streaming search"),
ldap_err,
)),
}
}
Err(ldap_error) => Err(Error::Query(
format!("Error getting next record: {ldap_error:?}"),
ldap_error,
)),
}
});
Ok(stream)
}