1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
use std::{
    borrow::Cow,
    collections::{BTreeSet, HashMap},
    sync::RwLock,
};

use testcontainers::{
    core::{ContainerState, IntoContainerPort, WaitFor},
    ContainerRequest, Image, TestcontainersError,
};

/// Available Neo4j plugins.
/// See [Neo4j operations manual](https://neo4j.com/docs/operations-manual/current/docker/operations/#docker-neo4j-plugins) for more information.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[non_exhaustive]
pub enum Neo4jLabsPlugin {
    /// APOC (Awesome Procedures on Cypher) **Extended** contains additional procedures and functions, which is available when you self-host the database and add the apoc-extended jar.
    /// See [`Neo4jLabsPlugin::ApocCore`] for the officially supported variant.
    /// As of `5.0` APOC has been split into separate repositories, one being the main, officially supported, `APOC Core` and `APOC Extended` Library.
    ///
    /// The APOC plugin provides provides access to user-defined procedures and functions which extend the use of the Cypher query language into areas such as data integration, graph algorithms, and data conversion..
    /// Please see [the APOC Extended documentation] for furhter details
    ///
    /// [Cypher query language]: https://neo4j.com/docs/cypher-manual/current/introduction/
    /// [the APOC Extended documentation]: https://neo4j.com/docs/apoc/current/
    Apoc,
    /// APOC (Awesome Procedures on Cypher) **Core** are battle hardened procedures and functions that don’t have external dependencies or require configuration.
    /// This is also the based of the functionality available in Neo4j AuraDB which lists the available APOC surface in their docs..
    /// See [`Neo4jLabsPlugin::Apoc`] for the variant maintained by the community.
    /// As of `5.0` APOC has been split into separate repositories, one being the main, officially supported, `APOC Core` and `APOC Extended` Library.
    ///
    /// The APOC plugin provides provides access to user-defined procedures and functions which extend the use of the [Cypher query language] into areas such as data integration, graph algorithms, and data conversion..
    /// Please see [the APOC Core documentation] for furhter details
    ///
    /// [Cypher query language]: https://neo4j.com/docs/cypher-manual/current/introduction/
    /// [the APOC Core documentation]: https://neo4j.com/docs/aura/platform/apoc/
    ApocCore,
    /// Bloom is a graph exploration application for visually interacting with graph data.
    /// Please see [their documentation](https://neo4j.com/docs/bloom-user-guide/current/) for furhter details
    Bloom,
    /// Allows integration of Kafka and other streaming solutions with Neo4j.
    /// Either to ingest data into the graph from other sources.
    /// Or to send update events (change data capture - CDC) to the event log for later consumption.
    /// Please see [their documentation](https://neo4j.com/docs/kafka-streams/) for furhter details
    ///
    /// <div class="warning">
    ///
    /// The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j.
    /// The most recent version of the Kafka Connect Neo4j Connector [can be found here](https://neo4j.com/docs/kafka).
    ///
    /// </div>
    Streams,
    /// Graph Data Science (GDS) library provides efficiently implemented, parallel versions of common graph algorithms, exposed as Cypher procedures. Additionally, GDS includes machine learning pipelines to train predictive supervised models to solve graph problems, such as predicting missing relationships..
    /// Please see [their documentation](https://neo4j.com/docs/graph-data-science/current/) for furhter details
    GraphDataScience,
    /// neosemantics (n10s) is a plugin that enables the use of RDF and its associated vocabularies like (OWL,RDFS,SKOS and others) in Neo4j.
    /// [RDF is a W3C standard model](https://www.w3.org/RDF/) for data interchange.
    /// You can use n10s to build integrations with RDF-generating / RDF-consuming components.
    /// You can also use it to validate your graph against constraints expressed in [SHACL](https://www.w3.org/TR/shacl/) or to run basic inferencing.
    /// Please see [their documentation](https://neo4j.com/labs/neosemantics/) for furhter details
    NeoSemantics,
    // Allows specifying other plugins
    Custom(String),
}

impl std::fmt::Display for Neo4jLabsPlugin {
    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Apoc => formatter.pad("apoc"),
            Self::ApocCore => formatter.pad("apoc-core"),
            Self::Bloom => formatter.pad("bloom"),
            Self::Streams => formatter.pad("streams"),
            Self::GraphDataScience => formatter.pad("graph-data-science"),
            Self::NeoSemantics => formatter.pad("n10s"),
            Self::Custom(plugin_name) => formatter.pad(plugin_name),
        }
    }
}

/// Neo4j image for [testcontainers](https://crates.io/crates/testcontainers).
///
/// This image is based on the official [Neo4j](https://hub.docker.com/_/neo4j) image.
/// The default user is `neo4j` and the default password is `neo`.
/// The default version is `5`.
///
/// # Example
///
/// ```rust,no_run
/// use testcontainers_modules::{neo4j::Neo4j, testcontainers::runners::SyncRunner};
///
/// let container = Neo4j::default().start().unwrap();
/// let uri = format!(
///     "bolt://{}:{}",
///     container.get_host().unwrap(),
///     container.image().bolt_port_ipv4().unwrap()
/// );
/// let auth_user = container.image().user();
/// let auth_pass = container.image().password();
/// // connect to Neo4j with the uri, user and pass
/// ```
///
/// # Neo4j Version
///
/// The version of the image can be set with the `NEO4J_VERSION_TAG` environment variable.
/// The default version is `5`.
/// The available versions can be found on [Docker Hub](https://hub.docker.com/_/neo4j/tags).
///
/// The used version can be retrieved with the `version` method.
///
/// # Auth
///
/// The default user is `neo4j` and the default password is `neo`.
///
/// The used user can be retrieved with the `user` method.
/// The used password can be retrieved with the `pass` method.
///
/// # Environment variables
///
/// The following environment variables are supported:
///   * `NEO4J_VERSION_TAG`: The default version of the image to use.
///   * `NEO4J_TEST_USER`: The default user to use for authentication.
///   * `NEO4J_TEST_PASS`: The default password to use for authentication.
///
/// # Neo4j Labs Plugins
///
/// Neo4j offers built-in support for Neo4j Labs plugins.
/// The method `with_neo4j_labs_plugin` can be used to define them.
///
/// Supported plugins are APOC, APOC Core, Bloom, Streams, Graph Data Science, and Neo Semantics.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Neo4j {
    version: Value,
    user: Option<Value>,
    pass: Option<Value>,
    plugins: BTreeSet<Neo4jLabsPlugin>,
}

impl Neo4j {
    const DEFAULT_USER: &'static str = "neo4j";
    const DEFAULT_PASS: &'static str = "password";
    const DEFAULT_VERSION_TAG: &'static str = "5";

    /// Create a new instance of a Neo4j image.
    #[must_use]
    pub fn new() -> Self {
        Self {
            version: Cow::Borrowed(Self::DEFAULT_VERSION_TAG),
            user: Some(Cow::Borrowed(Self::DEFAULT_USER)),
            pass: Some(Cow::Borrowed(Self::DEFAULT_PASS)),
            plugins: BTreeSet::new(),
        }
    }

    /// Set the Neo4j version to use.
    /// The value must be an existing Neo4j version tag.
    pub fn with_version(mut self, version: impl Into<Value>) -> Self {
        self.version = version.into();
        self
    }

    /// Set the username to use.
    #[must_use]
    pub fn with_user(mut self, user: impl Into<Value>) -> Self {
        self.user = Some(user.into());
        self
    }

    /// Set the password to use.
    #[must_use]
    pub fn with_password(mut self, pass: impl Into<Value>) -> Self {
        self.pass = Some(pass.into());
        self
    }

    /// Do not use any authentication on the testcontainer.
    ///
    /// Setting this will override any prior usages of [`Self::with_user`] and
    /// [`Self::with_password`].
    pub fn without_authentication(mut self) -> Self {
        self.user = None;
        self.pass = None;
        self
    }

    /// Add Neo4j lab plugins to get started with the database.
    #[must_use]
    pub fn with_neo4j_labs_plugin(mut self, plugins: &[Neo4jLabsPlugin]) -> Self {
        self.plugins.extend(plugins.iter().cloned());
        self
    }
}

type Value = Cow<'static, str>;

impl Default for Neo4j {
    fn default() -> Self {
        Self::new()
    }
}

/// The actual Neo4j testcontainers image type which is returned by `container.image()`
pub struct Neo4jImage {
    version: String,
    auth: Option<(String, String)>,
    env_vars: HashMap<String, String>,
    state: RwLock<Option<ContainerState>>,
}

impl Neo4jImage {
    /// Return the version of the Neo4j image.
    #[must_use]
    pub fn version(&self) -> &str {
        &self.version
    }

    /// Return the user/password authentication tuple of the Neo4j server.
    /// If no authentication is set, `None` is returned.
    #[must_use]
    pub fn auth(&self) -> Option<(&str, &str)> {
        self.auth
            .as_ref()
            .map(|(user, pass)| (user.as_str(), pass.as_str()))
    }

    /// Return the user of the Neo4j server.
    /// If no authentication is set, `None` is returned.
    #[must_use]
    pub fn user(&self) -> Option<&str> {
        self.auth().map(|(user, _)| user)
    }

    /// Return the password of the Neo4j server.
    /// If no authentication is set, `None` is returned.
    #[must_use]
    pub fn password(&self) -> Option<&str> {
        self.auth().map(|(_, pass)| pass)
    }

    /// Return the port to connect to the Neo4j server via Bolt over IPv4.
    pub fn bolt_port_ipv4(&self) -> Result<u16, TestcontainersError> {
        self.state
            .read()
            .map_err(|_| TestcontainersError::other("failed to lock the sate of Neo4J"))?
            .as_ref()
            .ok_or_else(|| {
                TestcontainersError::other("Container must be started before port can be retrieved")
            })?
            .host_port_ipv4(7687.tcp())
    }

    /// Return the port to connect to the Neo4j server via Bolt over IPv6.
    pub fn bolt_port_ipv6(&self) -> Result<u16, TestcontainersError> {
        self.state
            .read()
            .map_err(|_| TestcontainersError::other("failed to lock the sate of Neo4J"))?
            .as_ref()
            .ok_or_else(|| {
                TestcontainersError::other("Container must be started before port can be retrieved")
            })?
            .host_port_ipv6(7687.tcp())
    }

    /// Return the port to connect to the Neo4j server via HTTP over IPv4.
    pub fn http_port_ipv4(&self) -> Result<u16, TestcontainersError> {
        self.state
            .read()
            .map_err(|_| TestcontainersError::other("failed to lock the sate of Neo4J"))?
            .as_ref()
            .ok_or_else(|| {
                TestcontainersError::other("Container must be started before port can be retrieved")
            })?
            .host_port_ipv4(7474.tcp())
    }

    /// Return the port to connect to the Neo4j server via HTTP over IPv6.
    pub fn http_port_ipv6(&self) -> Result<u16, TestcontainersError> {
        self.state
            .read()
            .map_err(|_| TestcontainersError::other("failed to lock the sate of Neo4J"))?
            .as_ref()
            .ok_or_else(|| {
                TestcontainersError::other("Container must be started before port can be retrieved")
            })?
            .host_port_ipv6(7474.tcp())
    }
}

impl Image for Neo4jImage {
    fn name(&self) -> &str {
        "neo4j"
    }

    fn tag(&self) -> &str {
        &self.version
    }

    fn ready_conditions(&self) -> Vec<WaitFor> {
        vec![
            WaitFor::message_on_stdout("Bolt enabled on"),
            WaitFor::message_on_stdout("Started."),
        ]
    }

    fn env_vars(
        &self,
    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
        &self.env_vars
    }

    fn exec_after_start(
        &self,
        cs: ContainerState,
    ) -> Result<Vec<testcontainers::core::ExecCommand>, TestcontainersError> {
        self.state
            .write()
            .map_err(|_| TestcontainersError::other("failed to lock the sate of Neo4J"))?
            .replace(cs);
        Ok(Vec::new())
    }
}

impl Neo4j {
    fn auth_env(&self) -> impl IntoIterator<Item = (String, String)> {
        let auth = self
            .user
            .as_ref()
            .and_then(|user| self.pass.as_ref().map(|pass| format!("{}/{}", user, pass)))
            .unwrap_or_else(|| "none".to_owned());
        Some(("NEO4J_AUTH".to_owned(), auth))
    }

    fn plugins_env(&self) -> impl IntoIterator<Item = (String, String)> {
        if self.plugins.is_empty() {
            return None;
        }

        let plugin_names = self
            .plugins
            .iter()
            .map(|p| format!("\"{}\"", p))
            .collect::<Vec<String>>()
            .join(",");

        let plugin_definition = format!("[{}]", plugin_names);

        Some(("NEO4JLABS_PLUGINS".to_owned(), plugin_definition))
    }

    fn conf_env(&self) -> impl IntoIterator<Item = (String, String)> {
        let pass = self.pass.as_ref()?;

        if pass.len() < 8 {
            Some((
                "NEO4J_dbms_security_auth__minimum__password__length".to_owned(),
                pass.len().to_string(),
            ))
        } else {
            None
        }
    }

    fn build(self) -> Neo4jImage {
        let mut env_vars = HashMap::new();

        for (key, value) in self.auth_env() {
            env_vars.insert(key, value);
        }

        for (key, value) in self.plugins_env() {
            env_vars.insert(key, value);
        }

        for (key, value) in self.conf_env() {
            env_vars.insert(key, value);
        }

        let auth = self
            .user
            .and_then(|user| self.pass.map(|pass| (user.into_owned(), pass.into_owned())));

        let version = self.version.into_owned();

        Neo4jImage {
            version,
            auth,
            env_vars,
            state: RwLock::new(None),
        }
    }
}

impl From<Neo4j> for Neo4jImage {
    fn from(neo4j: Neo4j) -> Self {
        neo4j.build()
    }
}

impl From<Neo4j> for ContainerRequest<Neo4jImage> {
    fn from(neo4j: Neo4j) -> Self {
        Self::from(neo4j.build())
    }
}

impl std::fmt::Debug for Neo4jImage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Neo4jImage")
            .field("version", &self.version)
            .field("auth", &self.auth())
            .field("env_vars", &self.env_vars)
            .finish()
    }
}

#[cfg(test)]
mod tests {
    use neo4rs::Graph;

    use super::*;
    use crate::testcontainers::runners::AsyncRunner;

    #[test]
    fn set_valid_version() {
        let neo4j = Neo4j::new().with_version("4.2.0").build();
        assert_eq!(neo4j.version, "4.2.0");
    }

    #[test]
    fn set_partial_version() {
        let neo4j = Neo4j::new().with_version("4.2").build();
        assert_eq!(neo4j.version, "4.2");

        let neo4j = Neo4j::new().with_version("4").build();
        assert_eq!(neo4j.version, "4");
    }

    #[test]
    fn set_user() {
        let neo4j = Neo4j::new().with_user("Benutzer").build();
        assert_eq!(neo4j.user(), Some("Benutzer"));
        assert_eq!(neo4j.auth(), Some(("Benutzer", "password")));
        assert_eq!(
            neo4j.env_vars.get("NEO4J_AUTH").unwrap(),
            "Benutzer/password"
        );
    }

    #[test]
    fn set_password() {
        let neo4j = Neo4j::new().with_password("Passwort").build();
        assert_eq!(neo4j.password(), Some("Passwort"));
        assert_eq!(neo4j.auth(), Some(("neo4j", "Passwort")));
        assert_eq!(neo4j.env_vars.get("NEO4J_AUTH").unwrap(), "neo4j/Passwort");
    }

    #[test]
    fn set_short_password() {
        let neo4j = Neo4j::new().with_password("1337").build();
        assert_eq!(neo4j.password(), Some("1337"));
        assert_eq!(neo4j.auth(), Some(("neo4j", "1337")));
        assert_eq!(
            neo4j
                .env_vars
                .get("NEO4J_dbms_security_auth__minimum__password__length")
                .unwrap(),
            "4"
        );
    }

    #[test]
    fn disable_auth() {
        let neo4j = Neo4j::new().without_authentication().build();
        assert_eq!(neo4j.password(), None);
        assert_eq!(neo4j.user(), None);
        assert_eq!(neo4j.auth(), None);
        assert_eq!(neo4j.env_vars.get("NEO4J_AUTH").unwrap(), "none");
    }

    #[test]
    fn single_plugin_definition() {
        let neo4j = Neo4j::new()
            .with_neo4j_labs_plugin(&[Neo4jLabsPlugin::Apoc])
            .build();
        assert_eq!(
            neo4j.env_vars.get("NEO4JLABS_PLUGINS").unwrap(),
            "[\"apoc\"]"
        );
    }

    #[test]
    fn multiple_plugin_definition() {
        let neo4j = Neo4j::new()
            .with_neo4j_labs_plugin(&[Neo4jLabsPlugin::Apoc, Neo4jLabsPlugin::Bloom])
            .build();
        assert_eq!(
            neo4j.env_vars.get("NEO4JLABS_PLUGINS").unwrap(),
            "[\"apoc\",\"bloom\"]"
        );
    }

    #[test]
    fn multiple_wiht_plugin_calls() {
        let neo4j = Neo4j::new()
            .with_neo4j_labs_plugin(&[Neo4jLabsPlugin::Apoc])
            .with_neo4j_labs_plugin(&[Neo4jLabsPlugin::Bloom])
            .with_neo4j_labs_plugin(&[Neo4jLabsPlugin::Apoc])
            .build();
        assert_eq!(
            neo4j.env_vars.get("NEO4JLABS_PLUGINS").unwrap(),
            "[\"apoc\",\"bloom\"]"
        );
    }

    #[tokio::test]
    async fn it_works() -> Result<(), Box<dyn std::error::Error + 'static>> {
        let container = Neo4j::default().start().await?;

        let uri = format!(
            "bolt://{}:{}",
            container.get_host().await?,
            container.image().bolt_port_ipv4()?
        );

        let auth_user = container.image().user().expect("default user");
        let auth_pass = container.image().password().expect("default password");

        let graph = Graph::new(uri, auth_user, auth_pass).await.unwrap();
        let mut result = graph.execute(neo4rs::query("RETURN 1")).await.unwrap();
        let row = result.next().await.unwrap().unwrap();
        let value: i64 = row.get("1").unwrap();
        assert_eq!(1, value);
        Ok(())
    }
}