1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
use crate::annis::db;
use crate::annis::db::annostorage::AnnoStorage;
use crate::annis::db::aql;
use crate::annis::db::aql::operators;
use crate::annis::db::aql::operators::RangeSpec;
use crate::annis::db::exec::nodesearch::NodeSearchSpec;
use crate::annis::db::plan::ExecutionPlan;
use crate::annis::db::query;
use crate::annis::db::query::conjunction::Conjunction;
use crate::annis::db::query::disjunction::Disjunction;
use crate::annis::db::relannis;
use crate::annis::db::token_helper::TokenHelper;
use crate::annis::db::{AnnotationStorage, Graph, Match, ANNIS_NS, NODE_TYPE};
use crate::annis::errors::ErrorKind;
use crate::annis::errors::*;
use crate::annis::types::AnnoKey;
use crate::annis::types::{
    Annotation, Component, ComponentType, CountExtra, Edge, FrequencyTable, NodeID,
    QueryAttributeDescription,
};
use crate::annis::util;
use crate::annis::util::memory_estimation;
use crate::annis::util::quicksort;
use crate::malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use crate::update::GraphUpdate;
use fs2::FileExt;
use linked_hash_map::LinkedHashMap;
use std;
use std::collections::{BTreeSet, HashSet};
use std::fmt;
use std::fs::File;
use std::fs::OpenOptions;
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::thread;

use rustc_hash::FxHashMap;

use rand;
use rand::seq::SliceRandom;
use sys_info;

enum CacheEntry {
    Loaded(Graph),
    NotLoaded,
}

/// Indicates if the corpus is partially or fully loaded into the main memory cache.
#[derive(Debug, Ord, Eq, PartialOrd, PartialEq)]
pub enum LoadStatus {
    /// Corpus is not loaded into main memory at all.
    NotLoaded,
    /// Corpus is partially loaded and is estimated to use the given amount of main memory in bytes.
    /// Partially means that the node annotations are and optionally some graph storages are loaded.
    PartiallyLoaded(usize),
    /// Corpus is fully loaded (node annotation information and all graph storages) and is estimated to use the given amount of main memory in bytes.
    FullyLoaded(usize),
}

/// Information about a single graph storage of the corpus.
#[derive(Ord, Eq, PartialOrd, PartialEq)]
pub struct GraphStorageInfo {
    /// The component this graph storage belongs to.
    pub component: Component,
    /// Indicates if the graph storage is loaded or not.
    pub load_status: LoadStatus,
    /// Number of edge annotations in this graph storage.
    pub number_of_annotations: usize,
    /// Name of the implementation
    pub implementation: String,
}

impl fmt::Display for GraphStorageInfo {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        writeln!(
            f,
            "Component {}: {} annnotations",
            self.component, self.number_of_annotations
        )?;
        writeln!(f, "Implementation: {}", self.implementation)?;
        match self.load_status {
            LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
            LoadStatus::PartiallyLoaded(memory_size) => {
                writeln!(f, "Status: {:?}", "partially loaded")?;
                writeln!(
                    f,
                    "Memory: {:.2} MB",
                    memory_size as f64 / f64::from(1024 * 1024)
                )?;
            }
            LoadStatus::FullyLoaded(memory_size) => {
                writeln!(f, "Status: {:?}", "fully loaded")?;
                writeln!(
                    f,
                    "Memory: {:.2} MB",
                    memory_size as f64 / f64::from(1024 * 1024)
                )?;
            }
        };
        Ok(())
    }
}

/// Information about a corpus that is part of the corpus storage.
#[derive(Ord, Eq, PartialOrd, PartialEq)]
pub struct CorpusInfo {
    /// Name of the corpus.
    pub name: String,
    /// Indicates if the corpus is partially or fully loaded.
    pub load_status: LoadStatus,
    /// A list of descriptions for the graph storages of this corpus.
    pub graphstorages: Vec<GraphStorageInfo>,
}

impl fmt::Display for CorpusInfo {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self.load_status {
            LoadStatus::NotLoaded => writeln!(f, "Not Loaded")?,
            LoadStatus::PartiallyLoaded(memory_size) => {
                writeln!(f, "Status: {:?}", "partially loaded")?;
                writeln!(
                    f,
                    "Total memory: {:.2} MB",
                    memory_size as f64 / f64::from(1024 * 1024)
                )?;
            }
            LoadStatus::FullyLoaded(memory_size) => {
                writeln!(f, "Status: {:?}", "fully loaded")?;
                writeln!(
                    f,
                    "Total memory: {:.2} MB",
                    memory_size as f64 / f64::from(1024 * 1024)
                )?;
            }
        };
        if !self.graphstorages.is_empty() {
            writeln!(f, "------------")?;
            for gs in &self.graphstorages {
                write!(f, "{}", gs)?;
                writeln!(f, "------------")?;
            }
        }
        Ok(())
    }
}

/// Defines the order of results of a `find` query.
#[derive(Debug, PartialEq, Clone, Copy)]
#[repr(C)]
pub enum ResultOrder {
    /// Order results by their document name and the the text position of the match.
    Normal,
    /// Inverted the order of `Normal`.
    Inverted,
    /// A random ordering which is **not stable**. Each new query will result in a different order.
    Randomized,
    /// Results are not ordered at all, but also not actively randomized
    /// Each new query *might* result in a different order.
    NotSorted,
}

struct PreparationResult<'a> {
    query: Disjunction<'a>,
    db_entry: Arc<RwLock<CacheEntry>>,
}

/// Definition of a single attribute of a frequency query.
#[derive(Debug)]
pub struct FrequencyDefEntry {
    /// The namespace of the annotation from which the attribute value is generated.
    pub ns: Option<String>,
    /// The name of the annotation from which the attribute value is generated.
    pub name: String,
    /// The name of the query node from which the attribute value is generated.
    pub node_ref: String,
}

impl FromStr for FrequencyDefEntry {
    type Err = Error;
    fn from_str(s: &str) -> std::result::Result<FrequencyDefEntry, Self::Err> {
        let splitted: Vec<&str> = s.splitn(2, ':').collect();
        if splitted.len() != 2 {
            return Err("Frequency definition must consists of two parts: \
                        the referenced node and the annotation name or \"tok\" separated by \":\""
                .into());
        }
        let node_ref = splitted[0];
        let anno_key = util::split_qname(splitted[1]);

        Ok(FrequencyDefEntry {
            ns: anno_key.0.and_then(|ns| Some(String::from(ns))),
            name: String::from(anno_key.1),
            node_ref: String::from(node_ref),
        })
    }
}

/// An enum over all supported query languages of graphANNIS.
///
/// Currently, only the ANNIS Query Language (AQL) and its variants are supported, but this enum allows us to add a support for older query language versions
/// or completly new query languages.
#[repr(C)]
#[derive(Clone, Copy)]
pub enum QueryLanguage {
    AQL,
    /// Emulates the (sometimes problematic) behavior of AQL used in ANNIS 3
    AQLQuirksV3,
}

/// An enum of all supported input formats of graphANNIS.
#[repr(C)]
#[derive(Clone, Copy)]
pub enum ImportFormat {
    /// Legacy [relANNIS import file format](http://korpling.github.io/ANNIS/doc/dev-annisimportformat.html)
    RelANNIS,
}

/// Different strategies how it is decided when corpora need to be removed from the cache.
pub enum CacheStrategy {
    /// Fixed maximum size of the cache in bytes.
    /// Before and after a new entry is loaded, the cache is cleared to have at maximum this given size.
    /// The loaded entry is always added to the cache, even if the single corpus is larger than the maximum size.
    FixedMaxMemory(usize),
    /// Maximum percent of the current free space/memory available.
    /// E.g. if the percent is 25 and there is 4,5 GB of free memory not used by the cache itself, the cache will use at most 1,125 GB memory.
    /// Cache size is checked before and after a corpus is loaded.
    /// The loaded entry is always added to the cache, even if the single corpus is larger than the maximum size.
    PercentOfFreeMemory(f64),
}

/// A thread-safe API for managing corpora stored in a common location on the file system.
///
/// Multiple corpora can be part of a corpus storage and they are identified by their unique name.
/// Corpora are loaded from disk into main memory on demand:
/// An internal main memory cache is used to avoid re-loading a recently queried corpus from disk again.
pub struct CorpusStorage {
    db_dir: PathBuf,
    lock_file: File,
    cache_strategy: CacheStrategy,
    corpus_cache: RwLock<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
    query_config: query::Config,
    active_background_workers: Arc<(Mutex<usize>, Condvar)>,
}

impl CorpusStorage {
    /// Create a new instance with a maximum size for the internal corpus cache.
    ///
    /// - `db_dir` - The path on the filesystem where the corpus storage content is located. Must be an existing directory.
    /// - `cache_strategy`: A strategy for clearing the cache.
    /// - `use_parallel_joins` - If `true` parallel joins are used by the system, using all available cores.
    pub fn with_cache_strategy(
        db_dir: &Path,
        cache_strategy: CacheStrategy,
        use_parallel_joins: bool,
    ) -> Result<CorpusStorage> {
        let query_config = query::Config { use_parallel_joins };

        #[cfg_attr(feature = "cargo-clippy", allow(clippy))]
        let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));
        let cs = CorpusStorage {
            db_dir: PathBuf::from(db_dir),
            lock_file: create_lockfile_for_directory(db_dir)?,
            cache_strategy,
            corpus_cache: RwLock::new(LinkedHashMap::new()),
            query_config,
            active_background_workers,
        };

        Ok(cs)
    }

    /// Create a new instance with a an automatic determined size of the internal corpus cache.
    ///
    /// Currently, set the maximum cache size to 25% of the available/free memory at construction time.
    /// This behavior chan change in the future.
    ///
    /// - `db_dir` - The path on the filesystem where the corpus storage content is located. Must be an existing directory.
    /// - `use_parallel_joins` - If `true` parallel joins are used by the system, using all available cores.
    pub fn with_auto_cache_size(db_dir: &Path, use_parallel_joins: bool) -> Result<CorpusStorage> {
        let query_config = query::Config { use_parallel_joins };

        // get the amount of available memory, use a quarter of it per default
        let cache_strategy: CacheStrategy = CacheStrategy::PercentOfFreeMemory(25.0);

        #[cfg_attr(feature = "cargo-clippy", allow(clippy))]
        let active_background_workers = Arc::new((Mutex::new(0), Condvar::new()));

        let cs = CorpusStorage {
            db_dir: PathBuf::from(db_dir),
            lock_file: create_lockfile_for_directory(db_dir)?,
            cache_strategy,
            corpus_cache: RwLock::new(LinkedHashMap::new()),
            query_config,
            active_background_workers,
        };

        Ok(cs)
    }

    /// List  all available corpora in the corpus storage.
    pub fn list(&self) -> Result<Vec<CorpusInfo>> {
        let names: Vec<String> = self.list_from_disk().unwrap_or_default();
        let mut result: Vec<CorpusInfo> = vec![];

        let mut mem_ops =
            MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);

        for n in names {
            if let Ok(corpus_info) = self.create_corpus_info(&n, &mut mem_ops) {
                result.push(corpus_info);
            }
        }

        Ok(result)
    }

    fn list_from_disk(&self) -> Result<Vec<String>> {
        let mut corpora: Vec<String> = Vec::new();
        for c_dir in self.db_dir.read_dir().chain_err(|| {
            format!(
                "Listing directories from {} failed",
                self.db_dir.to_string_lossy()
            )
        })? {
            let c_dir = c_dir.chain_err(|| {
                format!(
                    "Could not get directory entry of folder {}",
                    self.db_dir.to_string_lossy()
                )
            })?;
            let ftype = c_dir.file_type().chain_err(|| {
                format!(
                    "Could not determine file type for {}",
                    c_dir.path().to_string_lossy()
                )
            })?;
            if ftype.is_dir() {
                let corpus_name = c_dir.file_name().to_string_lossy().to_string();
                corpora.push(corpus_name.clone());
            }
        }
        Ok(corpora)
    }

    fn create_corpus_info(
        &self,
        corpus_name: &str,
        mem_ops: &mut MallocSizeOfOps,
    ) -> Result<CorpusInfo> {
        let cache_entry = self.get_entry(corpus_name)?;
        let lock = cache_entry.read().unwrap();
        let corpus_info: CorpusInfo = match &*lock {
            CacheEntry::Loaded(ref db) => {
                // check if all components are loaded
                let heap_size = db.size_of(mem_ops);
                let mut load_status = LoadStatus::FullyLoaded(heap_size);

                let mut graphstorages = Vec::new();
                for c in db.get_all_components(None, None) {
                    if let Some(gs) = db.get_graphstorage_as_ref(&c) {
                        graphstorages.push(GraphStorageInfo {
                            component: c.clone(),
                            load_status: LoadStatus::FullyLoaded(gs.size_of(mem_ops)),
                            number_of_annotations: gs.get_anno_storage().number_of_annotations(),
                            implementation: gs.serialization_id().clone(),
                        });
                    } else {
                        load_status = LoadStatus::PartiallyLoaded(heap_size);
                        graphstorages.push(GraphStorageInfo {
                            component: c.clone(),
                            load_status: LoadStatus::NotLoaded,
                            number_of_annotations: 0,
                            implementation: "".to_owned(),
                        })
                    }
                }

                CorpusInfo {
                    name: corpus_name.to_owned(),
                    load_status,
                    graphstorages,
                }
            }
            &CacheEntry::NotLoaded => CorpusInfo {
                name: corpus_name.to_owned(),
                load_status: LoadStatus::NotLoaded,
                graphstorages: vec![],
            },
        };
        Ok(corpus_info)
    }

    /// Return detailled information about a specific corpus with a given name (`corpus_name`).
    pub fn info(&self, corpus_name: &str) -> Result<CorpusInfo> {
        let mut mem_ops =
            MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);
        self.create_corpus_info(corpus_name, &mut mem_ops)
    }

    fn get_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
        let corpus_name = corpus_name.to_string();

        {
            // test with read-only access if corpus is contained in cache
            let cache_lock = self.corpus_cache.read().unwrap();
            let cache = &*cache_lock;
            if let Some(e) = cache.get(&corpus_name) {
                return Ok(e.clone());
            }
        }

        // if not yet available, change to write-lock and insert cache entry
        let mut cache_lock = self.corpus_cache.write().unwrap();
        let cache = &mut *cache_lock;

        let entry = cache
            .entry(corpus_name.clone())
            .or_insert_with(|| Arc::new(RwLock::new(CacheEntry::NotLoaded)));

        Ok(entry.clone())
    }

    fn load_entry_with_lock(
        &self,
        cache_lock: &mut RwLockWriteGuard<LinkedHashMap<String, Arc<RwLock<CacheEntry>>>>,
        corpus_name: &str,
        create_if_missing: bool,
    ) -> Result<Arc<RwLock<CacheEntry>>> {
        let cache = &mut *cache_lock;

        // if not loaded yet, get write-lock and load entry
        let db_path: PathBuf = [self.db_dir.to_string_lossy().as_ref(), &corpus_name]
            .iter()
            .collect();

        let create_corpus = if db_path.is_dir() {
            false
        } else if create_if_missing {
            true
        } else {
            return Err(ErrorKind::NoSuchCorpus(corpus_name.to_string()).into());
        };

        // make sure the cache is not too large before adding the new corpus
        check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);

        let db = if create_corpus {
            // create the default graph storages that are assumed to exist in every corpus
            let mut db = Graph::with_default_graphstorages()?;

            // save corpus to the path where it should be stored
            db.persist_to(&db_path)
                .chain_err(|| format!("Could not create corpus with name {}", corpus_name))?;
            db
        } else {
            let mut db = Graph::new();
            db.load_from(&db_path, false)?;
            db
        };

        let entry = Arc::new(RwLock::new(CacheEntry::Loaded(db)));
        // first remove entry, than add it: this ensures it is at the end of the linked hash map
        cache.remove(corpus_name);
        cache.insert(String::from(corpus_name), entry.clone());

        check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![corpus_name]);

        Ok(entry)
    }

    fn get_loaded_entry(
        &self,
        corpus_name: &str,
        create_if_missing: bool,
    ) -> Result<Arc<RwLock<CacheEntry>>> {
        let cache_entry = self.get_entry(corpus_name)?;

        // check if basics (node annotation, strings) of the database are loaded
        let loaded = {
            let lock = cache_entry.read().unwrap();
            match &*lock {
                CacheEntry::Loaded(_) => true,
                _ => false,
            }
        };

        if loaded {
            Ok(cache_entry)
        } else {
            let mut cache_lock = self.corpus_cache.write().unwrap();
            self.load_entry_with_lock(&mut cache_lock, corpus_name, create_if_missing)
        }
    }

    fn get_loaded_entry_with_components(
        &self,
        corpus_name: &str,
        components: Vec<Component>,
    ) -> Result<Arc<RwLock<CacheEntry>>> {
        let db_entry = self.get_loaded_entry(corpus_name, false)?;
        let missing_components = {
            let lock = db_entry.read().unwrap();
            let db = get_read_or_error(&lock)?;

            let mut missing: HashSet<Component> = HashSet::new();
            for c in components {
                if !db.is_loaded(&c) {
                    missing.insert(c);
                }
            }
            missing
        };
        if !missing_components.is_empty() {
            // load the needed components
            let mut lock = db_entry.write().unwrap();
            let db = get_write_or_error(&mut lock)?;
            for c in missing_components {
                db.ensure_loaded(&c)?;
            }
        };

        Ok(db_entry)
    }

    fn get_fully_loaded_entry(&self, corpus_name: &str) -> Result<Arc<RwLock<CacheEntry>>> {
        let db_entry = self.get_loaded_entry(corpus_name, false)?;
        let missing_components = {
            let lock = db_entry.read().unwrap();
            let db = get_read_or_error(&lock)?;

            let mut missing: HashSet<Component> = HashSet::new();
            for c in db.get_all_components(None, None) {
                if !db.is_loaded(&c) {
                    missing.insert(c);
                }
            }
            missing
        };
        if !missing_components.is_empty() {
            // load the needed components
            let mut lock = db_entry.write().unwrap();
            let db = get_write_or_error(&mut lock)?;
            for c in missing_components {
                db.ensure_loaded(&c)?;
            }
        };

        Ok(db_entry)
    }

    /// Import a corpus from an external location on the file system into this corpus storage.
    ///
    /// - `path` - The location on the file system where the corpus data is located.
    /// - `format` - The format in which this corpus data is stored.
    /// - `corpus_name` - Optionally override the name of the new corpus for file formats that already provide a corpus name.
    pub fn import_from_fs(
        &self,
        path: &Path,
        format: ImportFormat,
        corpus_name: Option<String>,
    ) -> Result<String> {
        let (orig_name, mut graph) = match format {
            ImportFormat::RelANNIS => relannis::load(path, |status| {
                info!("{}", status);
                // loading the file from relANNIS consumes memory, update the corpus cache regulary to allow it to adapat
                self.check_cache_size_and_remove(vec![]);
            })?,
        };

        let r = graph.ensure_loaded_all();
        if let Err(e) = r {
            error!(
                "Some error occured when attempting to load components from disk: {:?}",
                e
            );
        }

        let corpus_name = corpus_name.unwrap_or(orig_name);

        let mut db_path = PathBuf::from(&self.db_dir);
        db_path.push(corpus_name.clone());

        let mut cache_lock = self.corpus_cache.write().unwrap();
        let cache = &mut *cache_lock;

        // make sure the cache is not too large before adding the new corpus
        check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![]);

        // remove any possible old corpus
        let old_entry = cache.remove(&corpus_name);
        if old_entry.is_some() {
            if let Err(e) = std::fs::remove_dir_all(db_path.clone()) {
                error!("Error when removing existing files {}", e);
            }
        }

        if let Err(e) = std::fs::create_dir_all(&db_path) {
            error!(
                "Can't create directory {}: {:?}",
                db_path.to_string_lossy(),
                e
            );
        }

        // save to its location
        let save_result = graph.save_to(&db_path);
        if let Err(e) = save_result {
            error!(
                "Can't save corpus to {}: {:?}",
                db_path.to_string_lossy(),
                e
            );
        }

        // make it known to the cache
        cache.insert(
            corpus_name.clone(),
            Arc::new(RwLock::new(CacheEntry::Loaded(graph))),
        );
        check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, vec![&corpus_name]);

        Ok(corpus_name)
    }

    /// Delete a corpus from this corpus storage.
    /// Returns `true` if the corpus was sucessfully deleted and `false` if no such corpus existed.
    pub fn delete(&self, corpus_name: &str) -> Result<bool> {
        let mut db_path = PathBuf::from(&self.db_dir);
        db_path.push(corpus_name);

        let mut cache_lock = self.corpus_cache.write().unwrap();

        let cache = &mut *cache_lock;

        // remove any possible old corpus
        if let Some(db_entry) = cache.remove(corpus_name) {
            // aquire exclusive lock for this cache entry because
            // other queries or background writer might still have access it and need to finish first
            let mut _lock = db_entry.write().unwrap();

            if db_path.is_dir() && db_path.exists() {
                std::fs::remove_dir_all(db_path.clone())
                    .chain_err(|| "Error when removing existing files")?
            }

            return Ok(true);
        } else {
            return Ok(false);
        }
    }

    /// Apply a sequence of updates (`update` parameter) to this graph for a corpus given by the `corpus_name` parameter.
    ///
    /// It is ensured that the update process is atomic and that the changes are persisted to disk if the result is `Ok`.
    pub fn apply_update(&self, corpus_name: &str, update: &mut GraphUpdate) -> Result<()> {
        let db_entry = self
            .get_loaded_entry(corpus_name, true)
            .chain_err(|| format!("Could not get loaded entry for corpus {}", corpus_name))?;
        {
            let mut lock = db_entry.write().unwrap();
            let db: &mut Graph = get_write_or_error(&mut lock)?;

            db.apply_update(update)?;
        }
        // start background thread to persists the results

        let active_background_workers = self.active_background_workers.clone();
        {
            let &(ref lock, ref _cvar) = &*active_background_workers;
            let mut nr_active_background_workers = lock.lock().unwrap();
            *nr_active_background_workers += 1;
        }
        thread::spawn(move || {
            trace!("Starting background thread to sync WAL updates");
            let lock = db_entry.read().unwrap();
            if let Ok(db) = get_read_or_error(&lock) {
                let db: &Graph = db;
                if let Err(e) = db.background_sync_wal_updates() {
                    error!("Can't sync changes in background thread: {:?}", e);
                } else {
                    trace!("Finished background thread to sync WAL updates");
                }
            }
            let &(ref lock, ref cvar) = &*active_background_workers;
            let mut nr_active_background_workers = lock.lock().unwrap();
            *nr_active_background_workers -= 1;
            cvar.notify_all();
        });

        Ok(())
    }

    fn prepare_query<'a>(
        &self,
        corpus_name: &str,
        query: &'a str,
        query_language: QueryLanguage,
        additional_components: Vec<Component>,
    ) -> Result<PreparationResult<'a>> {
        let db_entry = self.get_loaded_entry(corpus_name, false)?;

        // make sure the database is loaded with all necessary components
        let (q, missing_components) = {
            let lock = db_entry.read().unwrap();
            let db = get_read_or_error(&lock)?;

            let q = match query_language {
                QueryLanguage::AQL => aql::parse(query, false)?,
                QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
            };

            let necessary_components = q.necessary_components(db);

            let mut missing: HashSet<Component> =
                HashSet::from_iter(necessary_components.iter().cloned());

            // make sure the additional components are loaded
            missing.extend(additional_components.into_iter());

            // remove all that are already loaded
            for c in &necessary_components {
                if db.get_graphstorage(c).is_some() {
                    missing.remove(c);
                }
            }
            let missing: Vec<Component> = missing.into_iter().collect();
            (q, missing)
        };

        if !missing_components.is_empty() {
            // load the needed components
            {
                let mut lock = db_entry.write().unwrap();
                let db = get_write_or_error(&mut lock)?;
                for c in missing_components {
                    db.ensure_loaded(&c)?;
                }
            }
            self.check_cache_size_and_remove(vec![corpus_name]);
        };

        Ok(PreparationResult { query: q, db_entry })
    }

    /// Preloads all annotation and graph storages from the disk into a main memory cache.
    pub fn preload(&self, corpus_name: &str) -> Result<()> {
        {
            let db_entry = self.get_loaded_entry(corpus_name, false)?;
            let mut lock = db_entry.write().unwrap();
            let db = get_write_or_error(&mut lock)?;
            db.ensure_loaded_all()?;
        }
        self.check_cache_size_and_remove(vec![corpus_name]);
        Ok(())
    }

    /// Unloads a corpus from the cache.
    pub fn unload(&self, corpus_name: &str) {
        let mut cache_lock = self.corpus_cache.write().unwrap();
        let cache = &mut *cache_lock;
        cache.remove(corpus_name);
    }

    /// Update the graph and annotation statistics for a corpus given by `corpus_name`.
    pub fn update_statistics(&self, corpus_name: &str) -> Result<()> {
        let db_entry = self.get_loaded_entry(corpus_name, false)?;
        let mut lock = db_entry.write().unwrap();
        let db: &mut Graph = get_write_or_error(&mut lock)?;

        Arc::make_mut(&mut db.node_annos).calculate_statistics();
        for c in db.get_all_components(None, None) {
            db.calculate_component_statistics(&c)?;
        }

        // TODO: persist changes

        Ok(())
    }

    /// Parses a `query` and checks if it is valid.
    ///
    /// - `corpus_name` - The name of the corpus the query would be executed on (needed because missing annotation names can be a semantic parser error).
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    ///
    /// Returns `true` if valid and an error with the parser message if invalid.
    pub fn validate_query(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
    ) -> Result<bool> {
        let prep: PreparationResult =
            self.prepare_query(corpus_name, query, query_language, vec![])?;
        // also get the semantic errors by creating an execution plan on the actual Graph
        let lock = prep.db_entry.read().unwrap();
        let db = get_read_or_error(&lock)?;
        ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;
        Ok(true)
    }

    /// Returns a string representation of the execution plan for a `query`.
    ///
    /// - `corpus_name` - The name of the corpus to execute the query on.
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    pub fn plan(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
    ) -> Result<String> {
        let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;

        // accuire read-only lock and plan
        let lock = prep.db_entry.read().unwrap();
        let db = get_read_or_error(&lock)?;
        let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;

        Ok(format!("{}", plan))
    }

    /// Count the number of results for a `query`.
    /// - `corpus_name` - The name of the corpus to execute the query on.
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    ///
    /// Returns the count as number.
    pub fn count(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
    ) -> Result<u64> {
        let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;

        // accuire read-only lock and execute query
        let lock = prep.db_entry.read().unwrap();
        let db = get_read_or_error(&lock)?;
        let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;

        Ok(plan.count() as u64)
    }

    /// Count the number of results for a `query` and return both the total number of matches and also the number of documents in the result set.
    ///
    /// - `corpus_name` - The name of the corpus to execute the query on.
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    pub fn count_extra(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
    ) -> Result<CountExtra> {
        let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;

        // accuire read-only lock and execute query
        let lock = prep.db_entry.read().unwrap();
        let db: &Graph = get_read_or_error(&lock)?;
        let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;

        let mut known_documents = HashSet::new();

        let node_name_key_id = db
            .node_annos
            .get_key_id(&db.get_node_name_key())
            .ok_or("No internal ID for node names found")?;

        let result = plan.fold((0, 0), move |acc: (u64, usize), m: Vec<Match>| {
            if !m.is_empty() {
                let m: &Match = &m[0];
                if let Some(node_name) = db
                    .node_annos
                    .get_value_for_item_by_id(&m.node, node_name_key_id)
                {
                    let node_name: &str = node_name;
                    // extract the document path from the node name
                    let doc_path =
                        &node_name[0..node_name.rfind('#').unwrap_or_else(|| node_name.len())];
                    known_documents.insert(doc_path.to_owned());
                }
            }
            (acc.0 + 1, known_documents.len())
        });

        Ok(CountExtra {
            match_count: result.0,
            document_count: result.1 as u64,
        })
    }

    /// Find all results for a `query` and return the match ID for each result.
    ///
    /// The query is paginated and an offset and limit can be specified.
    ///
    /// - `corpus_name` - The name of the corpus to execute the query on.
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    /// - `offset` - Skip the `n` first results, where `n` is the offset.
    /// - `limit` - Return at most `n` matches, where `n` is the limit.
    /// - `order` - Specify the order of the matches.
    ///
    /// Returns a vector of match IDs, where each match ID consists of the matched node annotation identifiers separated by spaces.
    /// You can use the [subgraph(...)](#method.subgraph) method to get the subgraph for a single match described by the node annnotation identifiers.
    pub fn find(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
        offset: usize,
        limit: usize,
        order: ResultOrder,
    ) -> Result<Vec<String>> {
        let order_component = Component {
            ctype: ComponentType::Ordering,
            layer: String::from("annis"),
            name: String::from(""),
        };
        let prep = self.prepare_query(corpus_name, query, query_language, vec![order_component])?;

        // accuire read-only lock and execute query
        let lock = prep.db_entry.read().unwrap();
        let db = get_read_or_error(&lock)?;

        let mut query_config = self.query_config.clone();
        if order == ResultOrder::NotSorted {
            // Do not use parallization of the order should not be sorted to have a more stable result ordering.
            // Even if we do not promise to have a stable ordering, it should be the same
            // for the same session on the same corpus.
            query_config.use_parallel_joins = false;
        }

        let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &query_config)?;

        let mut expected_size: Option<usize> = None;
        let base_it: Box<Iterator<Item = Vec<Match>>> = if order == ResultOrder::NotSorted
            || (order == ResultOrder::Normal && plan.is_sorted_by_text())
        {
            // if the output is already sorted correctly, directly return the iterator
            Box::from(plan)
        } else {
            let estimated_result_size = plan.estimated_output_size();
            let mut tmp_results: Vec<Vec<Match>> = Vec::with_capacity(estimated_result_size);

            for mgroup in plan {
                // add all matches to temporary vector
                tmp_results.push(mgroup);
            }

            // either sort or randomly shuffle results
            if order == ResultOrder::Randomized {
                let mut rng = rand::thread_rng();
                tmp_results.shuffle(&mut rng);
            } else {
                let token_helper = TokenHelper::new(db);
                let component_order = Component {
                    ctype: ComponentType::Ordering,
                    layer: String::from("annis"),
                    name: String::from(""),
                };

                let gs_order = db.get_graphstorage_as_ref(&component_order);
                let order_func = |m1: &Vec<Match>, m2: &Vec<Match>| -> std::cmp::Ordering {
                    if order == ResultOrder::Inverted {
                        db::sort_matches::compare_matchgroup_by_text_pos(
                            m1,
                            m2,
                            &db.node_annos,
                            token_helper.as_ref(),
                            gs_order,
                        )
                        .reverse()
                    } else {
                        db::sort_matches::compare_matchgroup_by_text_pos(
                            m1,
                            m2,
                            &db.node_annos,
                            token_helper.as_ref(),
                            gs_order,
                        )
                    }
                };

                if self.query_config.use_parallel_joins {
                    quicksort::sort_first_n_items_parallel(
                        &mut tmp_results,
                        offset + limit,
                        order_func,
                    );
                } else {
                    quicksort::sort_first_n_items(&mut tmp_results, offset + limit, order_func);
                }
            }
            expected_size = Some(tmp_results.len());
            Box::from(tmp_results.into_iter())
        };

        let node_name_key_id = db
            .node_annos
            .get_key_id(&db.get_node_name_key())
            .ok_or("No internal ID for node names found")?;

        let mut results: Vec<String> = if let Some(expected_size) = expected_size {
            Vec::with_capacity(std::cmp::min(expected_size, limit))
        } else {
            Vec::new()
        };
        results.extend(base_it.skip(offset).take(limit).map(|m: Vec<Match>| {
            let mut match_desc: Vec<String> = Vec::new();
            for singlematch in &m {
                let mut node_desc = String::new();

                if let Some(anno_key) = db.node_annos.get_key_value(singlematch.anno_key) {
                    if &anno_key.ns != "annis" {
                        if !anno_key.ns.is_empty() {
                            node_desc.push_str(&anno_key.ns);
                            node_desc.push_str("::");
                        }
                        node_desc.push_str(&anno_key.name);
                        node_desc.push_str("::");
                    }
                }

                if let Some(name) = db
                    .node_annos
                    .get_value_for_item_by_id(&singlematch.node, node_name_key_id)
                {
                    node_desc.push_str("salt:/");
                    node_desc.push_str(name);
                }

                match_desc.push(node_desc);
            }
            let mut result = String::new();
            result.push_str(&match_desc.join(" "));
            result
        }));

        Ok(results)
    }

    /// Return the copy of a subgraph which includes the given list of node annotation identifiers,
    /// the nodes that cover the same token as the given nodes and
    /// all nodes that cover the token which are part of the defined context.
    ///
    /// - `corpus_name` - The name of the corpus for which the subgraph should be generated from.
    /// - `node_ids` - A set of node annotation identifiers describing the subgraph.
    /// - `ctx_left` and `ctx_right` - Left and right context in token distance to be included in the subgraph.
    pub fn subgraph(
        &self,
        corpus_name: &str,
        node_ids: Vec<String>,
        ctx_left: usize,
        ctx_right: usize,
    ) -> Result<Graph> {
        let db_entry = self.get_fully_loaded_entry(corpus_name)?;

        let mut query = Disjunction {
            alternatives: vec![],
        };

        // find all nodes covering the same token
        for source_node_id in node_ids {
            let source_node_id: &str = if source_node_id.starts_with("salt:/") {
                // remove the "salt:/" prefix
                &source_node_id[6..]
            } else {
                &source_node_id
            };

            // left context (non-token)
            {
                let mut q_left: Conjunction = Conjunction::new();

                let any_node_idx = q_left.add_node(NodeSearchSpec::AnyNode, None);

                let n_idx = q_left.add_node(
                    NodeSearchSpec::ExactValue {
                        ns: Some(db::ANNIS_NS.to_string()),
                        name: db::NODE_NAME.to_string(),
                        val: Some(source_node_id.to_string()),
                        is_meta: false,
                    },
                    None,
                );
                let tok_covered_idx = q_left.add_node(NodeSearchSpec::AnyToken, None);
                let tok_precedence_idx = q_left.add_node(NodeSearchSpec::AnyToken, None);

                q_left.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &n_idx,
                    &tok_covered_idx,
                    true,
                )?;
                q_left.add_operator(
                    Box::new(operators::PrecedenceSpec {
                        segmentation: None,
                        dist: RangeSpec::Bound {
                            min_dist: 0,
                            max_dist: ctx_left,
                        },
                    }),
                    &tok_precedence_idx,
                    &tok_covered_idx,
                    true,
                )?;
                q_left.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &any_node_idx,
                    &tok_precedence_idx,
                    true,
                )?;

                query.alternatives.push(q_left);
            }

            // left context (token onlys)
            {
                let mut q_left: Conjunction = Conjunction::new();

                let tok_precedence_idx = q_left.add_node(NodeSearchSpec::AnyToken, None);

                let n_idx = q_left.add_node(
                    NodeSearchSpec::ExactValue {
                        ns: Some(db::ANNIS_NS.to_string()),
                        name: db::NODE_NAME.to_string(),
                        val: Some(source_node_id.to_string()),
                        is_meta: false,
                    },
                    None,
                );
                let tok_covered_idx = q_left.add_node(NodeSearchSpec::AnyToken, None);

                q_left.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &n_idx,
                    &tok_covered_idx,
                    true,
                )?;
                q_left.add_operator(
                    Box::new(operators::PrecedenceSpec {
                        segmentation: None,
                        dist: RangeSpec::Bound {
                            min_dist: 0,
                            max_dist: ctx_left,
                        },
                    }),
                    &tok_precedence_idx,
                    &tok_covered_idx,
                    true,
                )?;

                query.alternatives.push(q_left);
            }

            // right context (non-token)
            {
                let mut q_right: Conjunction = Conjunction::new();

                let any_node_idx = q_right.add_node(NodeSearchSpec::AnyNode, None);

                let n_idx = q_right.add_node(
                    NodeSearchSpec::ExactValue {
                        ns: Some(db::ANNIS_NS.to_string()),
                        name: db::NODE_NAME.to_string(),
                        val: Some(source_node_id.to_string()),
                        is_meta: false,
                    },
                    None,
                );
                let tok_covered_idx = q_right.add_node(NodeSearchSpec::AnyToken, None);
                let tok_precedence_idx = q_right.add_node(NodeSearchSpec::AnyToken, None);

                q_right.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &n_idx,
                    &tok_covered_idx,
                    true,
                )?;
                q_right.add_operator(
                    Box::new(operators::PrecedenceSpec {
                        segmentation: None,
                        dist: RangeSpec::Bound {
                            min_dist: 0,
                            max_dist: ctx_right,
                        },
                    }),
                    &tok_covered_idx,
                    &tok_precedence_idx,
                    true,
                )?;
                q_right.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &any_node_idx,
                    &tok_precedence_idx,
                    true,
                )?;

                query.alternatives.push(q_right);
            }

            // right context (token only)
            {
                let mut q_right: Conjunction = Conjunction::new();

                let tok_precedence_idx = q_right.add_node(NodeSearchSpec::AnyToken, None);

                let n_idx = q_right.add_node(
                    NodeSearchSpec::ExactValue {
                        ns: Some(db::ANNIS_NS.to_string()),
                        name: db::NODE_NAME.to_string(),
                        val: Some(source_node_id.to_string()),
                        is_meta: false,
                    },
                    None,
                );
                let tok_covered_idx = q_right.add_node(NodeSearchSpec::AnyToken, None);

                q_right.add_operator(
                    Box::new(operators::OverlapSpec {}),
                    &n_idx,
                    &tok_covered_idx,
                    true,
                )?;
                q_right.add_operator(
                    Box::new(operators::PrecedenceSpec {
                        segmentation: None,
                        dist: RangeSpec::Bound {
                            min_dist: 0,
                            max_dist: ctx_right,
                        },
                    }),
                    &tok_covered_idx,
                    &tok_precedence_idx,
                    true,
                )?;

                query.alternatives.push(q_right);
            }
        }
        extract_subgraph_by_query(&db_entry, &query, &[0], &self.query_config, None)
    }

    /// Return the copy of a subgraph which includes all nodes matched by the given `query`.
    ///
    /// - `corpus_name` - The name of the corpus for which the subgraph should be generated from.
    /// - `query` - The query which defines included nodes.
    /// - `query_language` - The query language of the query (e.g. AQL).
    /// - `component_type_filter` - If set, only include edges of that belong to a component of the given type.
    pub fn subgraph_for_query(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
        component_type_filter: Option<ComponentType>,
    ) -> Result<Graph> {
        let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;

        let mut max_alt_size = 0;
        for alt in &prep.query.alternatives {
            max_alt_size = std::cmp::max(max_alt_size, alt.num_of_nodes());
        }

        let match_idx: Vec<usize> = (0..max_alt_size).collect();

        extract_subgraph_by_query(
            &prep.db_entry.clone(),
            &prep.query,
            &match_idx,
            &self.query_config,
            component_type_filter,
        )
    }

    /// Return the copy of a subgraph which includes all nodes that belong to any of the given list of sub-corpus/document identifiers.
    ///
    /// - `corpus_name` - The name of the corpus for which the subgraph should be generated from.
    /// - `corpus_ids` - A set of sub-corpus/document identifiers describing the subgraph.
    pub fn subcorpus_graph(&self, corpus_name: &str, corpus_ids: Vec<String>) -> Result<Graph> {
        let db_entry = self.get_fully_loaded_entry(corpus_name)?;

        let mut query = Disjunction {
            alternatives: vec![],
        };
        // find all nodes that a connected with the corpus IDs
        for source_corpus_id in corpus_ids {
            let source_corpus_id: &str = if source_corpus_id.starts_with("salt:/") {
                // remove the "salt:/" prefix
                &source_corpus_id[6..]
            } else {
                &source_corpus_id
            };
            let mut q = Conjunction::new();
            let corpus_idx = q.add_node(
                NodeSearchSpec::ExactValue {
                    ns: Some(db::ANNIS_NS.to_string()),
                    name: db::NODE_NAME.to_string(),
                    val: Some(source_corpus_id.to_string()),
                    is_meta: false,
                },
                None,
            );
            let any_node_idx = q.add_node(NodeSearchSpec::AnyNode, None);
            q.add_operator(
                Box::new(operators::PartOfSubCorpusSpec {
                    dist: RangeSpec::Unbound,
                }),
                &any_node_idx,
                &corpus_idx,
                true,
            )?;
            query.alternatives.push(q);
        }

        extract_subgraph_by_query(&db_entry, &query, &[1], &self.query_config, None)
    }

    /// Return the copy of the graph of the corpus structure given by `corpus_name`.
    pub fn corpus_graph(&self, corpus_name: &str) -> Result<Graph> {
        let db_entry = self.get_loaded_entry(corpus_name, false)?;

        let subcorpus_components = {
            // make sure all subcorpus partitions are loaded
            let lock = db_entry.read().unwrap();
            let db = get_read_or_error(&lock)?;
            db.get_all_components(Some(ComponentType::PartOfSubcorpus), None)
        };
        let db_entry = self.get_loaded_entry_with_components(corpus_name, subcorpus_components)?;

        let mut query = Conjunction::new();

        query.add_node(
            NodeSearchSpec::new_exact(Some(ANNIS_NS), NODE_TYPE, Some("corpus"), false),
            None,
        );

        extract_subgraph_by_query(
            &db_entry,
            &query.into_disjunction(),
            &[0],
            &self.query_config,
            Some(ComponentType::PartOfSubcorpus),
        )
    }

    /// Execute a frequency query.
    ///
    /// - `corpus_name` - The name of the corpus to execute the query on.
    /// - `query` - The query as string.
    /// - `query_language` The query language of the query (e.g. AQL).
    /// - `definition` - A list of frequency query definitions.
    ///
    /// Returns a frequency table of strings.
    pub fn frequency(
        &self,
        corpus_name: &str,
        query: &str,
        query_language: QueryLanguage,
        definition: Vec<FrequencyDefEntry>,
    ) -> Result<FrequencyTable<String>> {
        let prep = self.prepare_query(corpus_name, query, query_language, vec![])?;

        // accuire read-only lock and execute query
        let lock = prep.db_entry.read().unwrap();
        let db: &Graph = get_read_or_error(&lock)?;

        // get the matching annotation keys for each definition entry
        let mut annokeys: Vec<(usize, Vec<AnnoKey>)> = Vec::default();
        for def in definition {
            if let Some(node_ref) = prep.query.get_variable_pos(&def.node_ref) {
                if let Some(ns) = def.ns {
                    // add the single fully qualified annotation key
                    annokeys.push((
                        node_ref,
                        vec![AnnoKey {
                            ns: ns.clone(),
                            name: def.name.clone(),
                        }],
                    ));
                } else {
                    // add all matching annotation keys
                    annokeys.push((node_ref, db.node_annos.get_qnames(&def.name)));
                }
            }
        }

        let plan = ExecutionPlan::from_disjunction(&prep.query, &db, &self.query_config)?;

        let mut tuple_frequency: FxHashMap<Vec<String>, usize> = FxHashMap::default();

        for mgroup in plan {
            // for each match, extract the defined annotation (by its key) from the result node
            let mut tuple: Vec<String> = Vec::with_capacity(annokeys.len());
            for (node_ref, anno_keys) in &annokeys {
                let mut tuple_val: String = String::default();
                if *node_ref < mgroup.len() {
                    let m: &Match = &mgroup[*node_ref];
                    for k in anno_keys.iter() {
                        if let Some(val) = db.node_annos.get_value_for_item(&m.node, k) {
                            tuple_val = val.to_owned();
                        }
                    }
                }
                tuple.push(tuple_val);
            }
            // add the tuple to the frequency count
            let tuple_count: &mut usize = tuple_frequency.entry(tuple).or_insert(0);
            *tuple_count += 1;
        }

        // output the frequency
        let mut result: FrequencyTable<String> = FrequencyTable::default();
        for (tuple, count) in tuple_frequency {
            result.push((tuple, count));
        }

        // sort the output (largest to smallest)
        result.sort_by(|a, b| a.1.cmp(&b.1).reverse());

        Ok(result)
    }

    /// Parses a `query`and return a list of descriptions for its nodes.
    ///
    /// - `query` - The query to be analyzed.
    /// - `query_language` - The query language of the query (e.g. AQL).
    pub fn node_descriptions(
        &self,
        query: &str,
        query_language: QueryLanguage,
    ) -> Result<Vec<QueryAttributeDescription>> {
        let mut result = Vec::new();
        // parse query
        let q: Disjunction = match query_language {
            QueryLanguage::AQL => aql::parse(query, false)?,
            QueryLanguage::AQLQuirksV3 => aql::parse(query, true)?,
        };

        let mut component_nr = 0;
        for alt in q.alternatives {
            let alt: Conjunction = alt;
            for mut n in alt.get_node_descriptions() {
                n.alternative = component_nr;
                result.push(n);
            }
            component_nr += 1;
        }

        Ok(result)
    }

    /// Returns a list of all components of a corpus given by `corpus_name`.
    ///
    /// - `ctype` - Optionally filter by the component type.
    /// - `name` - Optionally filter by the component name.
    pub fn list_components(
        &self,
        corpus_name: &str,
        ctype: Option<ComponentType>,
        name: Option<&str>,
    ) -> Vec<Component> {
        if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
            let lock = db_entry.read().unwrap();
            if let Ok(db) = get_read_or_error(&lock) {
                return db.get_all_components(ctype, name);
            }
        }
        return vec![];
    }

    /// Returns a list of all node annotations of a corpus given by `corpus_name`.
    ///
    /// - `list_values` - If true include the possible values in the result.
    /// - `only_most_frequent_values` - If both this argument and `list_values` are true, only return the most frequent value for each annotation name.
    pub fn list_node_annotations(
        &self,
        corpus_name: &str,
        list_values: bool,
        only_most_frequent_values: bool,
    ) -> Vec<Annotation> {
        let mut result: Vec<Annotation> = Vec::new();
        if let Ok(db_entry) = self.get_loaded_entry(corpus_name, false) {
            let lock = db_entry.read().unwrap();
            if let Ok(db) = get_read_or_error(&lock) {
                let node_annos: &AnnoStorage<NodeID> = &db.node_annos;
                for key in node_annos.annotation_keys() {
                    if list_values {
                        if only_most_frequent_values {
                            // get the first value
                            if let Some(val) =
                                node_annos.get_all_values(&key, true).into_iter().next()
                            {
                                result.push(Annotation {
                                    key: key.clone(),
                                    val: val.to_owned(),
                                });
                            }
                        } else {
                            // get all values
                            for val in node_annos.get_all_values(&key, false) {
                                result.push(Annotation {
                                    key: key.clone(),
                                    val: val.to_owned(),
                                });
                            }
                        }
                    } else {
                        result.push(Annotation {
                            key: key.clone(),
                            val: String::default(),
                        });
                    }
                }
            }
        }

        result
    }

    /// Returns a list of all edge annotations of a corpus given by `corpus_name` and the `component`.
    ///
    /// - `list_values` - If true include the possible values in the result.
    /// - `only_most_frequent_values` - If both this argument and `list_values` are true, only return the most frequent value for each annotation name.
    pub fn list_edge_annotations(
        &self,
        corpus_name: &str,
        component: &Component,
        list_values: bool,
        only_most_frequent_values: bool,
    ) -> Vec<Annotation> {
        let mut result: Vec<Annotation> = Vec::new();
        if let Ok(db_entry) =
            self.get_loaded_entry_with_components(corpus_name, vec![component.clone()])
        {
            let lock = db_entry.read().unwrap();
            if let Ok(db) = get_read_or_error(&lock) {
                if let Some(gs) = db.get_graphstorage(&component) {
                    let edge_annos: &AnnotationStorage<Edge> = gs.get_anno_storage();
                    for key in edge_annos.annotation_keys() {
                        if list_values {
                            if only_most_frequent_values {
                                // get the first value
                                if let Some(val) =
                                    edge_annos.get_all_values(&key, true).into_iter().next()
                                {
                                    result.push(Annotation {
                                        key: key.clone(),
                                        val: val.to_owned(),
                                    });
                                }
                            } else {
                                // get all values
                                for val in edge_annos.get_all_values(&key, false) {
                                    result.push(Annotation {
                                        key: key.clone(),
                                        val: val.to_owned(),
                                    });
                                }
                            }
                        } else {
                            result.push(Annotation {
                                key: key.clone(),
                                val: String::new(),
                            });
                        }
                    }
                }
            }
        }

        result
    }

    fn check_cache_size_and_remove(&self, keep: Vec<&str>) {
        let mut cache_lock = self.corpus_cache.write().unwrap();
        let cache = &mut *cache_lock;
        check_cache_size_and_remove_with_cache(cache, &self.cache_strategy, keep);
    }
}

impl Drop for CorpusStorage {
    fn drop(&mut self) {
        // wait until all background workers are finished
        let &(ref lock, ref cvar) = &*self.active_background_workers;
        let mut nr_active_background_workers = lock.lock().unwrap();
        while *nr_active_background_workers > 0 {
            trace!(
                "Waiting for background thread to finish ({} worker(s) left)...",
                *nr_active_background_workers
            );
            nr_active_background_workers = cvar.wait(nr_active_background_workers).unwrap();
        }

        // unlock lock file
        if let Err(e) = self.lock_file.unlock() {
            warn!("Could not unlock CorpusStorage lock file: {:?}", e);
        } else {
            trace!("Unlocked CorpusStorage lock file");
        }
    }
}

#[cfg(test)]
mod tests {
    extern crate log;
    extern crate simplelog;
    extern crate tempfile;

    use crate::corpusstorage::QueryLanguage;
    use crate::update::{GraphUpdate, UpdateEvent};
    use crate::CorpusStorage;

    #[test]
    fn delete() {
        if let Ok(tmp) = tempfile::tempdir() {
            let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
            // fully load a corpus
            let mut g = GraphUpdate::new();
            g.add_event(UpdateEvent::AddNode {
                node_name: "test".to_string(),
                node_type: "node".to_string(),
            });

            cs.apply_update("testcorpus", &mut g).unwrap();
            cs.preload("testcorpus").unwrap();
            cs.delete("testcorpus").unwrap();
        }
    }

    #[test]
    fn load_cs_twice() {
        if let Ok(tmp) = tempfile::tempdir() {
            {
                let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
                let mut g = GraphUpdate::new();
                g.add_event(UpdateEvent::AddNode {
                    node_name: "test".to_string(),
                    node_type: "node".to_string(),
                });

                cs.apply_update("testcorpus", &mut g).unwrap();
            }

            {
                let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();
                let mut g = GraphUpdate::new();
                g.add_event(UpdateEvent::AddNode {
                    node_name: "test".to_string(),
                    node_type: "node".to_string(),
                });

                cs.apply_update("testcorpus", &mut g).unwrap();
            }
        }
    }

    #[test]
    fn apply_update_add_and_delete_nodes() {
        if let Ok(tmp) = tempfile::tempdir() {
            let cs = CorpusStorage::with_auto_cache_size(tmp.path(), false).unwrap();

            let mut g = GraphUpdate::new();
            g.add_event(UpdateEvent::AddNode {
                node_name: "root".to_string(),
                node_type: "corpus".to_string(),
            });
            g.add_event(UpdateEvent::AddNode {
                node_name: "root/doc1".to_string(),
                node_type: "corpus".to_string(),
            });
            g.add_event(UpdateEvent::AddNode {
                node_name: "root/doc1#MyToken1".to_string(),
                node_type: "node".to_string(),
            });

            g.add_event(UpdateEvent::AddNode {
                node_name: "root/doc1#MyToken2".to_string(),
                node_type: "node".to_string(),
            });

            g.add_event(UpdateEvent::AddEdge {
                source_node: "root/doc1#MyToken1".to_owned(),
                target_node: "root/doc1#MyToken2".to_owned(),
                layer: "dep".to_owned(),
                component_type: "Pointing".to_owned(),
                component_name: "dep".to_owned(),
            });

            g.add_event(UpdateEvent::AddNode {
                node_name: "root/doc2".to_string(),
                node_type: "corpus".to_string(),
            });
            g.add_event(UpdateEvent::AddNode {
                node_name: "root/doc2#MyToken".to_string(),
                node_type: "node".to_string(),
            });

            cs.apply_update("root", &mut g).unwrap();

            let node_count = cs.count("root", "node", QueryLanguage::AQL).unwrap();
            assert_eq!(3, node_count);

            let edge_count = cs.count("root", "node ->dep node", QueryLanguage::AQL).unwrap();
            assert_eq!(1, edge_count);

            // delete one of the tokens
            let mut g = GraphUpdate::new();
            g.add_event(UpdateEvent::DeleteNode {
                node_name: "root/doc1#MyToken2".to_string(),
            });
            cs.apply_update("root", &mut g).unwrap();

            let node_count = cs.count("root", "node", QueryLanguage::AQL).unwrap();
            assert_eq!(2, node_count);
            let edge_count = cs.count("root", "node ->dep node", QueryLanguage::AQL).unwrap();
            assert_eq!(0, edge_count);

        }
    }

}

fn get_read_or_error<'a>(lock: &'a RwLockReadGuard<CacheEntry>) -> Result<&'a Graph> {
    if let CacheEntry::Loaded(ref db) = &**lock {
        return Ok(db);
    } else {
        return Err(ErrorKind::LoadingDBFailed("".to_string()).into());
    }
}

fn get_write_or_error<'a>(lock: &'a mut RwLockWriteGuard<CacheEntry>) -> Result<&'a mut Graph> {
    if let CacheEntry::Loaded(ref mut db) = &mut **lock {
        return Ok(db);
    } else {
        return Err("Could get loaded graph storage entry".into());
    }
}

fn check_cache_size_and_remove_with_cache(
    cache: &mut LinkedHashMap<String, Arc<RwLock<CacheEntry>>>,
    cache_strategy: &CacheStrategy,
    keep: Vec<&str>,
) {
    let mut mem_ops = MallocSizeOfOps::new(memory_estimation::platform::usable_size, None, None);

    let keep: HashSet<&str> = keep.into_iter().collect();

    // check size of each corpus and calculate the sum of used memory
    let mut size_sum: usize = 0;
    let mut db_sizes: LinkedHashMap<String, usize> = LinkedHashMap::new();
    for (corpus, db_entry) in cache.iter() {
        let lock = db_entry.read().unwrap();
        if let CacheEntry::Loaded(ref db) = &*lock {
            let s = db.size_of_cached(&mut mem_ops);
            size_sum += s;
            db_sizes.insert(corpus.clone(), s);
        }
    }

    let max_cache_size: usize = match cache_strategy {
        CacheStrategy::FixedMaxMemory(max_size) => *max_size,
        CacheStrategy::PercentOfFreeMemory(max_percent) => {
            // get the current free space in main memory
            if let Ok(mem) = sys_info::mem_info() {
                // the free memory
                let free_system_mem: usize = mem.avail as usize * 1024; // mem.free is in KiB
                                                                        // A part of the system memory is already used by the cache.
                                                                        // We want x percent of the overall available memory (thus not used by us), so add the cache size
                let available_memory: usize = free_system_mem + size_sum;
                ((available_memory as f64) * (max_percent / 100.0)) as usize
            } else {
                // fallback to include only the last loaded corpus if free memory size is unknown
                0
            }
        }
    };

    debug!(
        "Current cache size is {:.2} MB / max  {:.2} MB",
        (size_sum as f64) / (1024.0 * 1024.0),
        (max_cache_size as f64) / (1024.0 * 1024.0)
    );

    // remove older entries (at the beginning) until cache size requirements are met,
    // but never remove the last loaded entry
    for (corpus_name, corpus_size) in db_sizes.iter() {
        if size_sum > max_cache_size {
            if !keep.contains(corpus_name.as_str()) {
                info!("Removing corpus {} from cache", corpus_name);
                cache.remove(corpus_name);
                size_sum -= corpus_size;
                debug!(
                    "Current cache size is {:.2} MB / max  {:.2} MB",
                    (size_sum as f64) / (1024.0 * 1024.0),
                    (max_cache_size as f64) / (1024.0 * 1024.0)
                );
            }
        } else {
            // cache size is smaller, nothing to do
            break;
        }
    }
}

fn extract_subgraph_by_query(
    db_entry: &Arc<RwLock<CacheEntry>>,
    query: &Disjunction,
    match_idx: &[usize],
    query_config: &query::Config,
    component_type_filter: Option<ComponentType>,
) -> Result<Graph> {
    // accuire read-only lock and create query that finds the context nodes
    let lock = db_entry.read().unwrap();
    let orig_db = get_read_or_error(&lock)?;

    let plan = ExecutionPlan::from_disjunction(&query, &orig_db, &query_config).chain_err(|| "")?;

    debug!("executing subgraph query\n{}", plan);

    // We have to keep our own unique set because the query will return "duplicates" whenever the other parts of the
    // match vector differ.
    let mut match_result: BTreeSet<Match> = BTreeSet::new();

    let mut result = Graph::new();

    // create the subgraph description
    for r in plan {
        trace!("subgraph query found match {:?}", r);
        for i in match_idx.iter().cloned() {
            if i < r.len() {
                let m: &Match = &r[i];
                if !match_result.contains(m) {
                    match_result.insert(m.clone());
                    trace!("subgraph query extracted node {:?}", m.node);
                    create_subgraph_node(m.node, &mut result, orig_db);
                }
            }
        }
    }

    let components = orig_db.get_all_components(component_type_filter, None);

    for m in &match_result {
        create_subgraph_edge(m.node, &mut result, orig_db, &components);
    }

    Ok(result)
}

fn create_subgraph_node(id: NodeID, db: &mut Graph, orig_db: &Graph) {
    // add all node labels with the same node ID
    let node_annos = Arc::make_mut(&mut db.node_annos);
    for a in orig_db.node_annos.get_annotations_for_item(&id) {
        node_annos.insert(id, a);
    }
}
fn create_subgraph_edge(
    source_id: NodeID,
    db: &mut Graph,
    orig_db: &Graph,
    components: &[Component],
) {
    // find outgoing edges
    for c in components {
        if let Some(orig_gs) = orig_db.get_graphstorage(c) {
            for target in orig_gs.get_outgoing_edges(source_id) {
                if !db.node_annos.get_all_keys_for_item(&target).is_empty() {
                    let e = Edge {
                        source: source_id,
                        target,
                    };
                    if let Ok(new_gs) = db.get_or_create_writable(&c) {
                        new_gs.add_edge(e.clone());
                    }

                    for a in orig_gs.get_anno_storage().get_annotations_for_item(&Edge {
                        source: source_id,
                        target,
                    }) {
                        if let Ok(new_gs) = db.get_or_create_writable(&c) {
                            new_gs.add_edge_annotation(e.clone(), a);
                        }
                    }
                }
            }
        }
    }
}

fn create_lockfile_for_directory(db_dir: &Path) -> Result<File> {
    std::fs::create_dir_all(&db_dir)
        .chain_err(|| format!("Could not create directory {}", db_dir.to_string_lossy()))?;
    let lock_file_path = db_dir.join("db.lock");
    // check if we can get the file lock
    let lock_file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .open(lock_file_path.as_path())
        .chain_err(|| {
            format!(
                "Could not open or create lockfile {}",
                lock_file_path.to_string_lossy()
            )
        })?;
    lock_file.try_lock_exclusive().chain_err(|| {
        format!(
            "Could not accuire lock for directory {}",
            db_dir.to_string_lossy()
        )
    })?;

    Ok(lock_file)
}