1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
//! High level wrapper of LMDB APIs
//!
//! Requires knowledge of LMDB terminology
//!
//! # Environment
//!
//! Environment is actually the center point of LMDB, it's a container
//! of everything else. As some settings couldn't be adjusted after
//! opening, `Environment` is constructed using `EnvBuilder`, which
//! sets up maximum size, maximum count of named databases, maximum
//! readers which could be used from different threads without locking
//! and so on.
//!
//! # Database
//!
//! Actual key-value store. The most crucial aspect is whether a database
//! allows duplicates or not. It is specified on creation and couldn't be
//! changed later. Entries for the same key are called `items`.
//!
//! There are a couple of optmizations to use, like marking
//! keys or data as integer, allowing sorting using reverse key, marking
//! keys/data as fixed size.
//!
//! # Transaction
//!
//! Absolutely every db operation happens in a transaction. It could
//! be a read-only transaction (reader), which is lockless and therefore
//! cheap. Or it could be a read-write transaction, which is unique, i.e.
//! there could be only one writer at a time.
//!
//! While readers are cheap and lockless, they work better being short-lived
//! as in other case they may lock pages from being reused. Readers have
//! a special API for marking as finished and renewing.
//!
//! It is perfectly fine to create nested transactions.
//!
//!
//! # Example
//!

#![allow(non_upper_case_globals)]

use libc::{c_int, c_uint, size_t, c_void};
use std;
use std::borrow::ToOwned;
use std::cell::{UnsafeCell};
use std::cmp::{Ordering};
use std::collections::HashMap;
use std::error::Error;
use std::ffi::{CString};
use std::path::Path;
use std::mem;
use std::ptr;
use std::result::Result;
use std::sync::{Arc, Mutex};

use ffi::{self, MDB_val};
pub use MdbError::{NotFound, KeyExists, Other, StateError, Corrupted, Panic};
pub use MdbError::{InvalidPath, TxnFull, CursorFull, PageFull, CacheError};
use traits::{ToMdbValue, FromMdbValue};
use utils::{error_msg};


macro_rules! lift_mdb {
    ($e:expr) => (lift_mdb!($e, ()));
    ($e:expr, $r:expr) => (
        {
            let t = $e;
            match t {
                ffi::MDB_SUCCESS => Ok($r),
                _ => return Err(MdbError::new_with_code(t))
            }
        })
}

macro_rules! try_mdb {
        ($e:expr) => (
        {
            let t = $e;
            match t {
                ffi::MDB_SUCCESS => (),
                _ => return Err(MdbError::new_with_code(t))
            }
        })
}

macro_rules! assert_state_eq {
    ($log:ident, $cur:expr, $exp:expr) =>
        ({
            let c = $cur;
            let e = $exp;
            if c == e {
                ()
            } else {
                let msg = format!("{} requires {:?}, is in {:?}", stringify!($log), c, e);
                return Err(StateError(msg))
            }})
}

macro_rules! assert_state_not {
    ($log:ident, $cur:expr, $exp:expr) =>
        ({
            let c = $cur;
            let e = $exp;
            if c != e {
                ()
            } else {
                let msg = format!("{} shouldn't be in {:?}", stringify!($log), e);
                return Err(StateError(msg))
            }})
}

/// MdbError wraps information about LMDB error
#[derive(Debug)]
pub enum MdbError {
    NotFound,
    KeyExists,
    TxnFull,
    CursorFull,
    PageFull,
    Corrupted,
    Panic,
    InvalidPath,
    StateError(String),
    CacheError,
    Other(c_int, String)
}


impl MdbError {
    pub fn new_with_code(code: c_int) -> MdbError {
        match code {
            ffi::MDB_NOTFOUND    => NotFound,
            ffi::MDB_KEYEXIST    => KeyExists,
            ffi::MDB_TXN_FULL    => TxnFull,
            ffi::MDB_CURSOR_FULL => CursorFull,
            ffi::MDB_PAGE_FULL   => PageFull,
            ffi::MDB_CORRUPTED   => Corrupted,
            ffi::MDB_PANIC       => Panic,
            _                    => Other(code, error_msg(code))
        }
    }
}


impl std::fmt::Display for MdbError {
    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            &NotFound | &KeyExists | &TxnFull |
            &CursorFull | &PageFull | &Corrupted |
            &Panic | &InvalidPath | &CacheError => write!(fmt, "{}", self.description()),
            &StateError(ref msg) => write!(fmt, "{}", msg),
            &Other(code, ref msg) => write!(fmt, "{}: {}", code, msg)
        }
    }
}

impl Error for MdbError {
    fn description(&self) -> &'static str {
        match self {
            &NotFound => "not found",
            &KeyExists => "key exists",
            &TxnFull => "txn full",
            &CursorFull => "cursor full",
            &PageFull => "page full",
            &Corrupted => "corrupted",
            &Panic => "panic",
            &InvalidPath => "invalid path for database",
            &StateError(_) => "state error",
            &CacheError => "db cache error",
            &Other(_, _) => "other error",
        }
    }
}


pub type MdbResult<T> = Result<T, MdbError>;

bitflags! {
    #[doc = "A set of environment flags which could be changed after opening"]

    pub flags EnvFlags: c_uint {

        #[doc="Don't flush system buffers to disk when committing a
        transaction. This optimization means a system crash can
        corrupt the database or lose the last transactions if buffers
        are not yet flushed to disk. The risk is governed by how
        often the system flushes dirty buffers to disk and how often
        mdb_env_sync() is called. However, if the filesystem
        preserves write order and the MDB_WRITEMAP flag is not used,
        transactions exhibit ACI (atomicity, consistency, isolation)
        properties and only lose D (durability). I.e. database
        integrity is maintained, but a system crash may undo the
        final transactions. Note that (MDB_NOSYNC | MDB_WRITEMAP)
        leaves the system with no hint for when to write transactions
        to disk, unless mdb_env_sync() is called. (MDB_MAPASYNC |
        MDB_WRITEMAP) may be preferable. This flag may be changed at
        any time using mdb_env_set_flags()."]
        const EnvNoSync      = ffi::MDB_NOSYNC,

        #[doc="Flush system buffers to disk only once per transaction,
        omit the metadata flush. Defer that until the system flushes
        files to disk, or next non-MDB_RDONLY commit or
        mdb_env_sync(). This optimization maintains database
        integrity, but a system crash may undo the last committed
        transaction. I.e. it preserves the ACI (atomicity,
        consistency, isolation) but not D (durability) database
        property. This flag may be changed at any time using
        mdb_env_set_flags()."]
        const EnvNoMetaSync  = ffi::MDB_NOMETASYNC,

        #[doc="When using MDB_WRITEMAP, use asynchronous flushes to
        disk. As with MDB_NOSYNC, a system crash can then corrupt the
        database or lose the last transactions. Calling
        mdb_env_sync() ensures on-disk database integrity until next
        commit. This flag may be changed at any time using
        mdb_env_set_flags()."]
        const EnvMapAsync    = ffi::MDB_MAPASYNC,

        #[doc="Don't initialize malloc'd memory before writing to
        unused spaces in the data file. By default, memory for pages
        written to the data file is obtained using malloc. While
        these pages may be reused in subsequent transactions, freshly
        malloc'd pages will be initialized to zeroes before use. This
        avoids persisting leftover data from other code (that used
        the heap and subsequently freed the memory) into the data
        file. Note that many other system libraries may allocate and
        free memory from the heap for arbitrary uses. E.g., stdio may
        use the heap for file I/O buffers. This initialization step
        has a modest performance cost so some applications may want
        to disable it using this flag. This option can be a problem
        for applications which handle sensitive data like passwords,
        and it makes memory checkers like Valgrind noisy. This flag
        is not needed with MDB_WRITEMAP, which writes directly to the
        mmap instead of using malloc for pages. The initialization is
        also skipped if MDB_RESERVE is used; the caller is expected
        to overwrite all of the memory that was reserved in that
        case. This flag may be changed at any time using
        mdb_env_set_flags()."]
        const EnvNoMemInit   = ffi::MDB_NOMEMINIT
    }
}

bitflags! {
    #[doc = "A set of all environment flags"]

    pub flags EnvCreateFlags: c_uint {
        #[doc="Use a fixed address for the mmap region. This flag must be"]
        #[doc=" specified when creating the environment, and is stored persistently"]
        #[doc=" in the environment. If successful, the memory map will always reside"]
        #[doc=" at the same virtual address and pointers used to reference data items"]
        #[doc=" in the database will be constant across multiple invocations. This "]
        #[doc="option may not always work, depending on how the operating system has"]
        #[doc=" allocated memory to shared libraries and other uses. The feature is highly experimental."]
        const EnvCreateFixedMap    = ffi::MDB_FIXEDMAP,
        #[doc="By default, LMDB creates its environment in a directory whose"]
        #[doc=" pathname is given in path, and creates its data and lock files"]
        #[doc=" under that directory. With this option, path is used as-is"]
        #[doc=" for the database main data file. The database lock file is"]
        #[doc=" the path with \"-lock\" appended."]
        const EnvCreateNoSubDir    = ffi::MDB_NOSUBDIR,
        #[doc="Don't flush system buffers to disk when committing a"]
        #[doc=" transaction. This optimization means a system crash can corrupt"]
        #[doc=" the database or lose the last transactions if buffers are not"]
        #[doc=" yet flushed to disk. The risk is governed by how often the"]
        #[doc=" system flushes dirty buffers to disk and how often"]
        #[doc=" mdb_env_sync() is called. However, if the filesystem preserves"]
        #[doc=" write order and the MDB_WRITEMAP flag is not used, transactions"]
        #[doc=" exhibit ACI (atomicity, consistency, isolation) properties and"]
        #[doc=" only lose D (durability). I.e. database integrity is"]
        #[doc=" maintained, but a system crash may undo the final"]
        #[doc=" transactions. Note that (MDB_NOSYNC | MDB_WRITEMAP) leaves"]
        #[doc=" the system with no hint for when to write transactions to"]
        #[doc=" disk, unless mdb_env_sync() is called."]
        #[doc=" (MDB_MAPASYNC | MDB_WRITEMAP) may be preferable. This flag"]
        #[doc=" may be changed at any time using mdb_env_set_flags()."]
        const EnvCreateNoSync      = ffi::MDB_NOSYNC,
        #[doc="Open the environment in read-only mode. No write operations"]
        #[doc=" will be allowed. LMDB will still modify the lock file - except"]
        #[doc=" on read-only filesystems, where LMDB does not use locks."]
        const EnvCreateReadOnly    = ffi::MDB_RDONLY,
        #[doc="Flush system buffers to disk only once per transaction,"]
        #[doc=" omit the metadata flush. Defer that until the system flushes"]
        #[doc=" files to disk, or next non-MDB_RDONLY commit or mdb_env_sync()."]
        #[doc=" This optimization maintains database integrity, but a system"]
        #[doc=" crash may undo the last committed transaction. I.e. it"]
        #[doc=" preserves the ACI (atomicity, consistency, isolation) but"]
        #[doc=" not D (durability) database property. This flag may be changed"]
        #[doc=" at any time using mdb_env_set_flags()."]
        const EnvCreateNoMetaSync  = ffi::MDB_NOMETASYNC,
        #[doc="Use a writeable memory map unless MDB_RDONLY is set. This is"]
        #[doc="faster and uses fewer mallocs, but loses protection from"]
        #[doc="application bugs like wild pointer writes and other bad updates"]
        #[doc="into the database. Incompatible with nested"]
        #[doc="transactions. Processes with and without MDB_WRITEMAP on the"]
        #[doc="same environment do not cooperate well."]
        const EnvCreateWriteMap    = ffi::MDB_WRITEMAP,
        #[doc="When using MDB_WRITEMAP, use asynchronous flushes to disk. As"]
        #[doc="with MDB_NOSYNC, a system crash can then corrupt the database or"]
        #[doc="lose the last transactions. Calling mdb_env_sync() ensures"]
        #[doc="on-disk database integrity until next commit. This flag may be"]
        #[doc="changed at any time using mdb_env_set_flags()."]
        const EnvCreataMapAsync    = ffi::MDB_MAPASYNC,
        #[doc="Don't use Thread-Local Storage. Tie reader locktable slots to"]
        #[doc="ffi::MDB_txn objects instead of to threads. I.e. mdb_txn_reset()"]
        #[doc="keeps the slot reseved for the ffi::MDB_txn object. A thread may"]
        #[doc="use parallel read-only transactions. A read-only transaction may"]
        #[doc="span threads if the user synchronizes its use. Applications that"]
        #[doc="multiplex many user threads over individual OS threads need this"]
        #[doc="option. Such an application must also serialize the write"]
        #[doc="transactions in an OS thread, since LMDB's write locking is"]
        #[doc="unaware of the user threads."]
        const EnvCreateNoTls       = ffi::MDB_NOTLS,
        #[doc="Don't do any locking. If concurrent access is anticipated, the"]
        #[doc="caller must manage all concurrency itself. For proper operation"]
        #[doc="the caller must enforce single-writer semantics, and must ensure"]
        #[doc="that no readers are using old transactions while a writer is"]
        #[doc="active. The simplest approach is to use an exclusive lock so"]
        #[doc="that no readers may be active at all when a writer begins. "]
        const EnvCreateNoLock      = ffi::MDB_NOLOCK,
        #[doc="Turn off readahead. Most operating systems perform readahead on"]
        #[doc="read requests by default. This option turns it off if the OS"]
        #[doc="supports it. Turning it off may help random read performance"]
        #[doc="when the DB is larger than RAM and system RAM is full. The"]
        #[doc="option is not implemented on Windows."]
        const EnvCreateNoReadAhead = ffi::MDB_NORDAHEAD,
        #[doc="Don't initialize malloc'd memory before writing to unused spaces"]
        #[doc="in the data file. By default, memory for pages written to the"]
        #[doc="data file is obtained using malloc. While these pages may be"]
        #[doc="reused in subsequent transactions, freshly malloc'd pages will"]
        #[doc="be initialized to zeroes before use. This avoids persisting"]
        #[doc="leftover data from other code (that used the heap and"]
        #[doc="subsequently freed the memory) into the data file. Note that"]
        #[doc="many other system libraries may allocate and free memory from"]
        #[doc="the heap for arbitrary uses. E.g., stdio may use the heap for"]
        #[doc="file I/O buffers. This initialization step has a modest"]
        #[doc="performance cost so some applications may want to disable it"]
        #[doc="using this flag. This option can be a problem for applications"]
        #[doc="which handle sensitive data like passwords, and it makes memory"]
        #[doc="checkers like Valgrind noisy. This flag is not needed with"]
        #[doc="MDB_WRITEMAP, which writes directly to the mmap instead of using"]
        #[doc="malloc for pages. The initialization is also skipped if"]
        #[doc="MDB_RESERVE is used; the caller is expected to overwrite all of"]
        #[doc="the memory that was reserved in that case. This flag may be"]
        #[doc="changed at any time using mdb_env_set_flags()."]
        const EnvCreateNoMemInit   = ffi::MDB_NOMEMINIT
    }
}

bitflags! {
    #[doc = "A set of database flags"]

    pub flags DbFlags: c_uint {
        #[doc="Keys are strings to be compared in reverse order, from the"]
        #[doc=" end of the strings to the beginning. By default, Keys are"]
        #[doc=" treated as strings and compared from beginning to end."]
        const DbReverseKey   = ffi::MDB_REVERSEKEY,
        #[doc="Duplicate keys may be used in the database. (Or, from another"]
        #[doc="perspective, keys may have multiple data items, stored in sorted"]
        #[doc="order.) By default keys must be unique and may have only a"]
        #[doc="single data item."]
        const DbAllowDups    = ffi::MDB_DUPSORT,
        #[doc="Keys are binary integers in native byte order. Setting this"]
        #[doc="option requires all keys to be the same size, typically"]
        #[doc="sizeof(int) or sizeof(size_t)."]
        const DbIntKey       = ffi::MDB_INTEGERKEY,
        #[doc="This flag may only be used in combination with"]
        #[doc="ffi::MDB_DUPSORT. This option tells the library that the data"]
        #[doc="items for this database are all the same size, which allows"]
        #[doc="further optimizations in storage and retrieval. When all data"]
        #[doc="items are the same size, the ffi::MDB_GET_MULTIPLE and"]
        #[doc="ffi::MDB_NEXT_MULTIPLE cursor operations may be used to retrieve"]
        #[doc="multiple items at once."]
        const DbDupFixed     = ffi::MDB_DUPFIXED,
        #[doc="This option specifies that duplicate data items are also"]
        #[doc="integers, and should be sorted as such."]
        const DbAllowIntDups = ffi::MDB_INTEGERDUP,
        #[doc="This option specifies that duplicate data items should be"]
        #[doc=" compared as strings in reverse order."]
        const DbReversedDups = ffi::MDB_REVERSEDUP,
        #[doc="Create the named database if it doesn't exist. This option"]
        #[doc=" is not allowed in a read-only transaction or a read-only"]
        #[doc=" environment."]
        const DbCreate       = ffi::MDB_CREATE,
    }
}

/// Database
#[derive(Debug)]
pub struct Database<'a> {
    handle: ffi::MDB_dbi,
    txn: &'a NativeTransaction<'a>,
}

// FIXME: provide different interfaces for read-only/read-write databases
// FIXME: provide different interfaces for simple KV and storage with duplicates

impl<'a> Database<'a> {
    fn new_with_handle(handle: ffi::MDB_dbi, txn: &'a NativeTransaction<'a>) -> Database<'a> {
        Database { handle: handle, txn: txn }
    }

    /// Retrieves current db's statistics.
    pub fn stat(&'a self) -> MdbResult<ffi::MDB_stat> {
        self.txn.stat(self.handle)
    }

    /// Retrieves a value by key. In case of DbAllowDups it will be the first value
    pub fn get<V: FromMdbValue + 'a>(&'a self, key: &ToMdbValue) -> MdbResult<V> {
        self.txn.get(self.handle, key)
    }

    /// Sets value for key. In case of DbAllowDups it will add a new item
    pub fn set(&self, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        self.txn.set(self.handle, key, value)
    }

    /// Appends new key-value pair to database, starting a new page instead of splitting an
    /// existing one if necessary. Requires that key be >= all existing keys in the database
    /// (or will return KeyExists error).
    pub fn append<K: ToMdbValue, V: ToMdbValue>(&self, key: &K, value: &V) -> MdbResult<()> {
        self.txn.append(self.handle, key, value)
    }

    /// Appends new value for the given key (requires DbAllowDups), starting a new page instead
    /// of splitting an existing one if necessary. Requires that value be >= all existing values
    /// for the given key (or will return KeyExists error).
    pub fn append_duplicate<K: ToMdbValue, V: ToMdbValue>(&self, key: &K, value: &V) -> MdbResult<()> {
        self.txn.append_duplicate(self.handle, key, value)
    }

    /// Set value for key. Fails if key already exists, even when duplicates are allowed.
    pub fn insert(&self, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        self.txn.insert(self.handle, key, value)
    }

    /// Deletes value for key.
    pub fn del(&self, key: &ToMdbValue) -> MdbResult<()> {
        self.txn.del(self.handle, key)
    }

    /// Should be used only with DbAllowDups. Deletes corresponding (key, value)
    pub fn del_item(&self, key: &ToMdbValue, data: &ToMdbValue) -> MdbResult<()> {
        self.txn.del_item(self.handle, key, data)
    }

    /// Returns a new cursor
    pub fn new_cursor(&'a self) -> MdbResult<Cursor<'a>> {
        self.txn.new_cursor(self.handle)
    }

    /// Deletes current db, also moves it out
    pub fn del_db(self) -> MdbResult<()> {
        self.txn.del_db(self)
    }

    /// Removes all key/values from db
    pub fn clear(&self) -> MdbResult<()> {
        self.txn.clear_db(self.handle)
    }

    /// Returns an iterator for all values in database
    pub fn iter(&'a self) -> MdbResult<CursorIterator<'a, CursorIter>> {
        self.txn.new_cursor(self.handle)
            .and_then(|c| Ok(CursorIterator::wrap(c, CursorIter)))
    }

    /// Returns an iterator through keys starting with start_key (>=), start_key is included
    pub fn keyrange_from<'c, K: ToMdbValue + 'c>(&'c self, start_key: &'c K) -> MdbResult<CursorIterator<'c, CursorFromKeyIter>> {
        let cursor = try!(self.txn.new_cursor(self.handle));
        let key_range = CursorFromKeyIter::new(start_key);
        let wrap = CursorIterator::wrap(cursor, key_range);
        Ok(wrap)
    }

    /// Returns an iterator through keys less than end_key, end_key is not included
    pub fn keyrange_to<'c, K: ToMdbValue + 'c>(&'c self, end_key: &'c K) -> MdbResult<CursorIterator<'c, CursorToKeyIter>> {
        let cursor = try!(self.txn.new_cursor(self.handle));
        let key_range = CursorToKeyIter::new(end_key);
        let wrap = CursorIterator::wrap(cursor, key_range);
        Ok(wrap)
    }

    /// Returns an iterator through keys `start_key <= x < end_key`. This is, start_key is
    /// included in the iteration, while end_key is kept excluded.
    pub fn keyrange_from_to<'c, K: ToMdbValue + 'c>(&'c self, start_key: &'c K, end_key: &'c K)
                               -> MdbResult<CursorIterator<'c, CursorKeyRangeIter>>
    {
        let cursor = try!(self.txn.new_cursor(self.handle));
        let key_range = CursorKeyRangeIter::new(start_key, end_key, false);
        let wrap = CursorIterator::wrap(cursor, key_range);
        Ok(wrap)
    }

    /// Returns an iterator for values between start_key and end_key (included).
    /// Currently it works only for unique keys (i.e. it will skip
    /// multiple items when DB created with ffi::MDB_DUPSORT).
    /// Iterator is valid while cursor is valid
    pub fn keyrange<'c, K: ToMdbValue + 'c>(&'c self, start_key: &'c K, end_key: &'c K)
                               -> MdbResult<CursorIterator<'c, CursorKeyRangeIter>>
    {
        let cursor = try!(self.txn.new_cursor(self.handle));
        let key_range = CursorKeyRangeIter::new(start_key, end_key, true);
        let wrap = CursorIterator::wrap(cursor, key_range);
        Ok(wrap)
    }

    /// Returns an iterator for all items (i.e. values with same key)
    pub fn item_iter<'c, 'db: 'c, K: ToMdbValue>(&'db self, key: &'c K) -> MdbResult<CursorIterator<'c, CursorItemIter<'c>>> {
        let cursor = try!(self.txn.new_cursor(self.handle));
        let inner_iter = CursorItemIter::<'c>::new(key);
        Ok(CursorIterator::<'c>::wrap(cursor, inner_iter))
    }

    /// Sets the key compare function for this database.
    ///
    /// Warning: This function must be called before any data access functions
    /// are used, otherwise data corruption may occur. The same comparison
    /// function must be used by every program accessing the database, every
    /// time the database is used.
    ///
    /// If not called, keys are compared lexically, with shorter keys collating
    /// before longer keys.
    ///
    /// Setting lasts for the lifetime of the underlying db handle.
    pub fn set_compare(&self, cmp_fn: extern "C" fn(*const MDB_val, *const MDB_val) -> c_int) -> MdbResult<()> {
        lift_mdb!(unsafe {
            ffi::mdb_set_compare(self.txn.handle, self.handle, cmp_fn)
        })
    }

    /// Sets the value comparison function for values of the same key in this database.
    ///
    /// Warning: This function must be called before any data access functions
    /// are used, otherwise data corruption may occur. The same dupsort
    /// function must be used by every program accessing the database, every
    /// time the database is used.
    ///
    /// If not called, values are compared lexically, with shorter values collating
    /// before longer values.
    ///
    /// Only used when DbAllowDups is true.
    /// Setting lasts for the lifetime of the underlying db handle.
    pub fn set_dupsort(&self, cmp_fn: extern "C" fn(*const MDB_val, *const MDB_val) -> c_int) -> MdbResult<()> {
        lift_mdb!(unsafe {
            ffi::mdb_set_dupsort(self.txn.handle, self.handle, cmp_fn)
        })
    }
}


/// Constructs environment with settigs which couldn't be
/// changed after opening. By default it tries to create
/// corresponding dir if it doesn't exist, use `autocreate_dir()`
/// to override that behavior
#[derive(Copy, Clone, Debug)]
pub struct EnvBuilder {
    flags: EnvCreateFlags,
    max_readers: Option<usize>,
    max_dbs: Option<usize>,
    map_size: Option<u64>,
    autocreate_dir: bool,
}

impl EnvBuilder {
    pub fn new() -> EnvBuilder {
        EnvBuilder {
            flags: EnvCreateFlags::empty(),
            max_readers: None,
            max_dbs: None,
            map_size: None,
            autocreate_dir: true,
        }
    }

    /// Sets environment flags
    pub fn flags(mut self, flags: EnvCreateFlags) -> EnvBuilder {
        self.flags = flags;
        self
    }

    /// Sets max concurrent readers operating on environment
    pub fn max_readers(mut self, max_readers: usize) -> EnvBuilder {
        self.max_readers = Some(max_readers);
        self
    }

    /// Set max number of databases
    pub fn max_dbs(mut self, max_dbs: usize) -> EnvBuilder {
        self.max_dbs = Some(max_dbs);
        self
    }

    /// Sets max environment size, i.e. size in memory/disk of
    /// all data
    pub fn map_size(mut self, map_size: u64) -> EnvBuilder {
        self.map_size = Some(map_size);
        self
    }

    /// Sets whetever `lmdb-rs` should try to autocreate dir with default
    /// permissions on opening (default is true)
    pub fn autocreate_dir(mut self, autocreate_dir: bool)  -> EnvBuilder {
        self.autocreate_dir = autocreate_dir;
        self
    }

    /// Opens environment in specified path
    pub fn open<P: AsRef<Path>>(self, path: P, perms: u32) -> MdbResult<Environment> {
        let changeable_flags: EnvCreateFlags = EnvCreataMapAsync | EnvCreateNoMemInit | EnvCreateNoSync | EnvCreateNoMetaSync;

        let env: *mut ffi::MDB_env = ptr::null_mut();
        unsafe {
            let p_env: *mut *mut ffi::MDB_env = std::mem::transmute(&env);
            let _ = try_mdb!(ffi::mdb_env_create(p_env));
        }

        // Enable only flags which can be changed, otherwise it'll fail
        try_mdb!(unsafe { ffi::mdb_env_set_flags(env, self.flags.bits() & changeable_flags.bits(), 1)});

        if let Some(map_size) = self.map_size {
            try_mdb!(unsafe { ffi::mdb_env_set_mapsize(env, map_size as size_t)});
        }

        if let Some(max_readers) = self.max_readers {
            try_mdb!(unsafe { ffi::mdb_env_set_maxreaders(env, max_readers as u32)});
        }

        if let Some(max_dbs) = self.max_dbs {
            try_mdb!(unsafe { ffi::mdb_env_set_maxdbs(env, max_dbs as u32)});
        }

        if self.autocreate_dir {
            let _ = try!(EnvBuilder::check_path(&path, self.flags));
        }

        let is_readonly = self.flags.contains(EnvCreateReadOnly);

        let res = unsafe {
            // FIXME: revert back once `convert` is stable
            // let c_path = path.as_os_str().to_cstring().unwrap();
            let path_str = try!(path.as_ref().to_str().ok_or(MdbError::InvalidPath));
            let c_path = try!(CString::new(path_str).map_err(|_| MdbError::InvalidPath));

            ffi::mdb_env_open(mem::transmute(env), c_path.as_ref().as_ptr(), self.flags.bits(),
                              perms as ffi::mdb_mode_t)
        };

        drop(self);
        match res {
            ffi::MDB_SUCCESS => {
                Ok(Environment::from_raw(env, is_readonly))
            },
            _ => {
                unsafe { ffi::mdb_env_close(mem::transmute(env)); }
                Err(MdbError::new_with_code(res))
            }
        }

    }

    fn check_path<P: AsRef<Path>>(path: P, flags: EnvCreateFlags) -> MdbResult<()> {
        use std::{fs, io};

        if flags.contains(EnvCreateNoSubDir) {
            // FIXME: check parent dir existence/absence
            warn!("checking for path in NoSubDir mode isn't implemented yet");
            return Ok(());
        }

        // There should be a directory before open
        match fs::metadata(&path) {
            Ok(meta) => {
                if meta.is_dir() {
                    Ok(())
                } else {
                    Err(MdbError::InvalidPath)
                }
            },
            Err(e) => {
                if e.kind() == io::ErrorKind::NotFound {
                    fs::create_dir_all(path.as_ref().clone()).map_err(|e| {
                        error!("failed to auto create dir: {}", e);
                        MdbError::InvalidPath
                    })
                } else {
                    Err(MdbError::InvalidPath)
                }
            }
        }
    }
}

#[derive(Debug)]
struct EnvHandle(*mut ffi::MDB_env);

impl Drop for EnvHandle {
    fn drop(&mut self) {
        unsafe {
            if self.0 != ptr::null_mut() {
                ffi::mdb_env_close(self.0);
            }
        }
    }
}

/// Represents LMDB Environment. Should be opened using `EnvBuilder`
#[derive(Debug)]
pub struct Environment {
    env: Arc<EnvHandle>,
    db_cache: Arc<Mutex<UnsafeCell<HashMap<String, ffi::MDB_dbi>>>>,
    is_readonly: bool, // true if opened in 'read-only' mode
}

impl Environment {
    pub fn new() -> EnvBuilder {
        EnvBuilder::new()
    }

    fn from_raw(env: *mut ffi::MDB_env, is_readonly: bool) -> Environment {
        Environment {
            env: Arc::new(EnvHandle(env)),
            db_cache: Arc::new(Mutex::new(UnsafeCell::new(HashMap::new()))),
            is_readonly: is_readonly,
        }
    }

    /// Check for stale entries in the reader lock table.
    ///
    /// Returns the number of stale slots that were cleared.
    pub fn reader_check(&self) -> MdbResult<c_int> {
        let mut dead: c_int = 0;
        lift_mdb!(unsafe { ffi::mdb_reader_check(self.env.0, &mut dead as *mut c_int)}, dead)
    }

    /// Retrieve environment statistics
    pub fn stat(&self) -> MdbResult<ffi::MDB_stat> {
        let mut tmp: ffi::MDB_stat = unsafe { std::mem::zeroed() };
        lift_mdb!(unsafe { ffi::mdb_env_stat(self.env.0, &mut tmp)}, tmp)
    }

    pub fn info(&self) -> MdbResult<ffi::MDB_envinfo> {
        let mut tmp: ffi::MDB_envinfo = unsafe { std::mem::zeroed() };
        lift_mdb!(unsafe { ffi::mdb_env_info(self.env.0, &mut tmp)}, tmp)
    }

    /// Sync environment to disk
    pub fn sync(&self, force: bool) -> MdbResult<()> {
        lift_mdb!(unsafe { ffi::mdb_env_sync(self.env.0, if force {1} else {0})})
    }

    /// Sets map size.
    /// This can be called after [open](struct.EnvBuilder.html#method.open) if no transactions are active in this process.
    pub fn set_mapsize(&self, map_size: usize) -> MdbResult<()> {
        lift_mdb!(unsafe { ffi::mdb_env_set_mapsize(self.env.0, map_size as size_t)})
    }

    /// This one sets only flags which are available for change even
    /// after opening, see also [get_flags](#method.get_flags) and [get_all_flags](#method.get_all_flags)
    pub fn set_flags(&mut self, flags: EnvFlags, turn_on: bool) -> MdbResult<()> {
        lift_mdb!(unsafe {
            ffi::mdb_env_set_flags(self.env.0, flags.bits(), if turn_on {1} else {0})
        })
    }

    /// Get flags of environment, which could be changed after it was opened
    /// use [get_all_flags](#method.get_all_flags) if you need also creation time flags
    pub fn get_flags(&self) -> MdbResult<EnvFlags> {
        let tmp = try!(self.get_all_flags());
        Ok(EnvFlags::from_bits_truncate(tmp.bits()))
    }

    /// Get all flags of environment, including which were specified on creation
    /// See also [get_flags](#method.get_flags) if you're interested only in modifiable flags
    pub fn get_all_flags(&self) -> MdbResult<EnvCreateFlags> {
        let mut flags: c_uint = 0;
        lift_mdb!(unsafe {ffi::mdb_env_get_flags(self.env.0, &mut flags)}, EnvCreateFlags::from_bits_truncate(flags))
    }

    pub fn get_maxreaders(&self) -> MdbResult<c_uint> {
        let mut max_readers: c_uint = 0;
        lift_mdb!(unsafe {
            ffi::mdb_env_get_maxreaders(self.env.0, &mut max_readers)
        }, max_readers)
    }

    pub fn get_maxkeysize(&self) -> c_int {
        unsafe {ffi::mdb_env_get_maxkeysize(self.env.0)}
    }

    /// Creates a backup copy in specified file descriptor
    pub fn copy_to_fd(&self, fd: ffi::mdb_filehandle_t) -> MdbResult<()> {
        lift_mdb!(unsafe { ffi::mdb_env_copyfd(self.env.0, fd) })
    }

    /// Gets file descriptor of this environment
    pub fn get_fd(&self) -> MdbResult<ffi::mdb_filehandle_t> {
        let mut fd = 0;
        lift_mdb!({ unsafe { ffi::mdb_env_get_fd(self.env.0, &mut fd) }}, fd)
    }

    /// Creates a backup copy in specified path
    // FIXME: check who is responsible for creating path: callee or caller
    pub fn copy_to_path<P: AsRef<Path>>(&self, path: P) -> MdbResult<()> {
        // FIXME: revert back once `convert` is stable
        // let c_path = path.as_os_str().to_cstring().unwrap();
        let path_str = try!(path.as_ref().to_str().ok_or(MdbError::InvalidPath));
        let c_path = try!(CString::new(path_str).map_err(|_| MdbError::InvalidPath));

        unsafe {
            lift_mdb!(ffi::mdb_env_copy(self.env.0, c_path.as_ref().as_ptr()))
        }
    }

    fn create_transaction(&self, parent: Option<NativeTransaction>, flags: c_uint) -> MdbResult<NativeTransaction> {
        let mut handle: *mut ffi::MDB_txn = ptr::null_mut();
        let parent_handle = match parent {
            Some(t) => t.handle,
            _ => ptr::null_mut()
        };

        lift_mdb!(unsafe { ffi::mdb_txn_begin(self.env.0, parent_handle, flags, &mut handle) },
                 NativeTransaction::new_with_handle(handle, flags as usize, self))
    }

    /// Creates a new read-write transaction
    ///
    /// Use `get_reader` to get much faster lock-free alternative
    pub fn new_transaction(&self) -> MdbResult<Transaction> {
        if self.is_readonly {
            return Err(MdbError::StateError("Error: creating read-write transaction in read-only environment".to_owned()))
        }
        self.create_transaction(None, 0)
            .and_then(|txn| Ok(Transaction::new_with_native(txn)))
    }

    /// Creates a readonly transaction
    pub fn get_reader(&self) -> MdbResult<ReadonlyTransaction> {
        self.create_transaction(None, ffi::MDB_RDONLY)
            .and_then(|txn| Ok(ReadonlyTransaction::new_with_native(txn)))
    }

    fn _open_db(&self, db_name: & str, flags: DbFlags, force_creation: bool) -> MdbResult<ffi::MDB_dbi> {
        debug!("Opening {} (create={}, read_only={})", db_name, force_creation, self.is_readonly);
        // From LMDB docs for mdb_dbi_open:
        //
        // This function must not be called from multiple concurrent
        // transactions. A transaction that uses this function must finish
        // (either commit or abort) before any other transaction may use
        // this function
        match self.db_cache.lock() {
            Err(_) => Err(MdbError::CacheError),
            Ok(guard) => {
                let ref cell = *guard;
                let cache = cell.get();

                unsafe {
                    if let Some(db) = (*cache).get(db_name) {
                        debug!("Cached value for {}: {}", db_name, *db);
                        return Ok(*db);
                    }
                }

                let mut txn = {
                    let txflags = if self.is_readonly { ffi::MDB_RDONLY } else { 0 };
                    try!(self.create_transaction(None, txflags))
                };
                let opt_name = if db_name.len() > 0 {Some(db_name)} else {None};
                let flags = if force_creation {flags | DbCreate} else {flags - DbCreate};

                let mut db: ffi::MDB_dbi = 0;
                let db_res = match opt_name {
                    None => unsafe { ffi::mdb_dbi_open(txn.handle, ptr::null(), flags.bits(), &mut db) },
                    Some(db_name) => {
                        let db_name = CString::new(db_name.as_bytes()).unwrap();
                        unsafe {
                            ffi::mdb_dbi_open(txn.handle, db_name.as_ptr(), flags.bits(), &mut db)
                        }
                    }
                };

                try_mdb!(db_res);
                try!(txn.commit());

                debug!("Caching: {} -> {}", db_name, db);
                unsafe {
                    (*cache).insert(db_name.to_owned(), db);
                };

                Ok(db)
            }
        }
    }

    /// Opens existing DB
    pub fn get_db(& self, db_name: &str, flags: DbFlags) -> MdbResult<DbHandle> {
        let db = try!(self._open_db(db_name, flags, false));
        Ok(DbHandle {handle: db, flags: flags})
    }

    /// Opens or creates a DB
    pub fn create_db(&self, db_name: &str, flags: DbFlags) -> MdbResult<DbHandle> {
        let db = try!(self._open_db(db_name, flags, true));
        Ok(DbHandle {handle: db, flags: flags})
    }

    /// Opens default DB with specified flags
    pub fn get_default_db(&self, flags: DbFlags) -> MdbResult<DbHandle> {
        self.get_db("", flags)
    }

    fn drop_db_from_cache(&self, handle: ffi::MDB_dbi) {
        match self.db_cache.lock() {
            Err(_) => (),
            Ok(guard) => {
                let ref cell = *guard;

                unsafe {
                    let cache = cell.get();

                    let mut key = None;
                    for (k, v) in (*cache).iter() {
                        if *v == handle {
                            key = Some(k);
                            break;
                        }
                    }

                    if let Some(key) = key {
                        (*cache).remove(key);
                    }
                }
            }
        }
    }
}

unsafe impl Sync for Environment {}
unsafe impl Send for Environment {}

impl Clone for Environment {
    fn clone(&self) -> Environment {
        Environment {
            env: self.env.clone(),
            db_cache: self.db_cache.clone(),
            is_readonly: self.is_readonly,
        }
    }
}

#[allow(dead_code)]
#[derive(Copy, Clone, Debug)]
/// A handle to a database
///
/// It can be cached to avoid opening db on every access
/// In the current state it is unsafe as other thread
/// can ask to drop it.
pub struct DbHandle {
    handle: ffi::MDB_dbi,
    flags: DbFlags
}

unsafe impl Sync for DbHandle {}
unsafe impl Send for DbHandle {}

#[derive(Copy, PartialEq, Debug, Eq, Clone)]
enum TransactionState {
    Normal,   // Normal, any operation possible
    Released, // Released (reset on readonly), has to be renewed
    Invalid,  // Invalid, no further operation possible
}

#[derive(Debug)]
struct NativeTransaction<'a> {
    handle: *mut ffi::MDB_txn,
    env: &'a Environment,
    flags: usize,
    state: TransactionState,
}

impl<'a> NativeTransaction<'a> {
    fn new_with_handle(h: *mut ffi::MDB_txn, flags: usize, env: &Environment) -> NativeTransaction {
        // debug!("new native txn");
        NativeTransaction {
            handle: h,
            flags: flags,
            state: TransactionState::Normal,
            env: env,
        }
    }

    fn is_readonly(&self) -> bool {
        (self.flags as u32 & ffi::MDB_RDONLY) == ffi::MDB_RDONLY
    }

    fn commit(&mut self) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        debug!("commit txn");
        self.state = if self.is_readonly() {
            TransactionState::Released
        } else {
            TransactionState::Invalid
        };
        try_mdb!(unsafe { ffi::mdb_txn_commit(self.handle) } );
        Ok(())
    }

    fn abort(&mut self) {
        if self.state != TransactionState::Normal {
            debug!("Can't abort transaction: current state {:?}", self.state)
        } else {
            debug!("abort txn");
            unsafe { ffi::mdb_txn_abort(self.handle); }
            self.state = if self.is_readonly() {
                TransactionState::Released
            } else {
                TransactionState::Invalid
            };
        }
    }

    /// Resets read only transaction, handle is kept. Must be followed
    /// by a call to `renew`
    fn reset(&mut self) {
        if self.state != TransactionState::Normal {
            debug!("Can't reset transaction: current state {:?}", self.state);
        } else {
            unsafe { ffi::mdb_txn_reset(self.handle); }
            self.state = TransactionState::Released;
        }
    }

    /// Acquires a new reader lock after it was released by reset
    fn renew(&mut self) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Released);
        try_mdb!(unsafe {ffi::mdb_txn_renew(self.handle)});
        self.state = TransactionState::Normal;
        Ok(())
    }

    fn new_child(&self, flags: c_uint) -> MdbResult<NativeTransaction> {
        let mut out: *mut ffi::MDB_txn = ptr::null_mut();
        try_mdb!(unsafe { ffi::mdb_txn_begin(ffi::mdb_txn_env(self.handle), self.handle, flags, &mut out) });
        Ok(NativeTransaction::new_with_handle(out, flags as usize, self.env))
    }

    /// Used in Drop to switch state
    fn silent_abort(&mut self) {
        if self.state == TransactionState::Normal {
            debug!("silent abort");
            unsafe {ffi::mdb_txn_abort(self.handle);}
            self.state = TransactionState::Invalid;
        }
    }

    fn get_value<V: FromMdbValue + 'a>(&'a self, db: ffi::MDB_dbi, key: &ToMdbValue) -> MdbResult<V> {
        let mut key_val = key.to_mdb_value();
        unsafe {
            let mut data_val: MdbValue = std::mem::zeroed();
            try_mdb!(ffi::mdb_get(self.handle, db, &mut key_val.value, &mut data_val.value));
            Ok(FromMdbValue::from_mdb_value(&data_val))
        }
    }

    fn get<V: FromMdbValue + 'a>(&'a self, db: ffi::MDB_dbi, key: &ToMdbValue) -> MdbResult<V> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.get_value(db, key)
    }

    fn set_value(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        self.set_value_with_flags(db, key, value, 0)
    }

    fn set_value_with_flags(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue, flags: c_uint) -> MdbResult<()> {
        unsafe {
            let mut key_val = key.to_mdb_value();
            let mut data_val = value.to_mdb_value();

            lift_mdb!(ffi::mdb_put(self.handle, db, &mut key_val.value, &mut data_val.value, flags))
        }
    }

    /// Sets a new value for key, in case of enabled duplicates
    /// it actually appends a new value
    // FIXME: think about creating explicit separation of
    // all traits for databases with dup keys
    fn set(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.set_value(db, key, value)
    }

    fn append(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.set_value_with_flags(db, key, value, ffi::MDB_APPEND)
    }

    fn append_duplicate(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.set_value_with_flags(db, key, value, ffi::MDB_APPENDDUP)
    }

    /// Set the value for key only if the key does not exist in the database,
    /// even if the database supports duplicates.
    fn insert(&self, db: ffi::MDB_dbi, key: &ToMdbValue, value: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.set_value_with_flags(db, key, value, ffi::MDB_NOOVERWRITE)
    }

    /// Deletes all values by key
    fn del_value(&self, db: ffi::MDB_dbi, key: &ToMdbValue) -> MdbResult<()> {
        unsafe {
            let mut key_val = key.to_mdb_value();
            lift_mdb!(ffi::mdb_del(self.handle, db, &mut key_val.value, ptr::null_mut()))
        }
    }

    /// If duplicate keys are allowed deletes value for key which is equal to data
    fn del_item(&self, db: ffi::MDB_dbi, key: &ToMdbValue, data: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        unsafe {
            let mut key_val = key.to_mdb_value();
            let mut data_val = data.to_mdb_value();

            lift_mdb!(ffi::mdb_del(self.handle, db, &mut key_val.value, &mut data_val.value))
        }
    }

    /// Deletes all values for key
    fn del(&self, db: ffi::MDB_dbi, key: &ToMdbValue) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        self.del_value(db, key)
    }

    /// Creates a new cursor in current transaction tied to db
    fn new_cursor(&'a self, db: ffi::MDB_dbi) -> MdbResult<Cursor<'a>> {
        Cursor::new(self, db)
    }

    /// Deletes provided database completely
    fn del_db(&self, db: Database) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        unsafe {
            self.env.drop_db_from_cache(db.handle);
            lift_mdb!(ffi::mdb_drop(self.handle, db.handle, 1))
        }
    }

    /// Empties provided database
    fn clear_db(&self, db: ffi::MDB_dbi) -> MdbResult<()> {
        assert_state_eq!(txn, self.state, TransactionState::Normal);
        unsafe {
            lift_mdb!(ffi::mdb_drop(self.handle, db, 0))
        }
    }

    /// Retrieves provided database's statistics
    fn stat(&self, db: ffi::MDB_dbi) -> MdbResult<ffi::MDB_stat> {
        let mut tmp: ffi::MDB_stat = unsafe { std::mem::zeroed() };
        lift_mdb!(unsafe { ffi::mdb_stat(self.handle, db, &mut tmp)}, tmp)
    }

    /*
    fn get_db(&self, name: &str, flags: DbFlags) -> MdbResult<Database> {
        self.env.get_db(name, flags)
            .and_then(|db| Ok(Database::new_with_handle(db.handle, self)))
    }
    */

    /*
    fn get_or_create_db(&self, name: &str, flags: DbFlags) -> MdbResult<Database> {
        self.get_db(name, flags | DbCreate)
    }
    */
}

impl<'a> Drop for NativeTransaction<'a> {
    fn drop(&mut self) {
        //debug!("Dropping native transaction!");
        self.silent_abort();
    }
}

#[derive(Debug)]
pub struct Transaction<'a> {
    inner: NativeTransaction<'a>,
}

impl<'a> Transaction<'a> {
    fn new_with_native(txn: NativeTransaction<'a>) -> Transaction<'a> {
        Transaction {
            inner: txn
        }
    }

    pub fn new_child(&self) -> MdbResult<Transaction> {
        self.inner.new_child(0)
            .and_then(|txn| Ok(Transaction::new_with_native(txn)))
    }

    pub fn new_ro_child(&self) -> MdbResult<ReadonlyTransaction> {
        self.inner.new_child(ffi::MDB_RDONLY)
            .and_then(|txn| Ok(ReadonlyTransaction::new_with_native(txn)))
    }

    /// Commits transaction, moves it out
    pub fn commit(self) -> MdbResult<()> {
        //self.inner.commit()
        let mut t = self;
        t.inner.commit()
    }

    /// Aborts transaction, moves it out
    pub fn abort(self) {
        let mut t = self;
        t.inner.abort();
    }

    pub fn bind(&self, db_handle: &DbHandle) -> Database {
        Database::new_with_handle(db_handle.handle, &self.inner)
    }
}


#[derive(Debug)]
pub struct ReadonlyTransaction<'a> {
    inner: NativeTransaction<'a>,
}


impl<'a> ReadonlyTransaction<'a> {
    fn new_with_native(txn: NativeTransaction<'a>) -> ReadonlyTransaction<'a> {
        ReadonlyTransaction {
            inner: txn,
        }
    }

    pub fn new_ro_child(&self) -> MdbResult<ReadonlyTransaction> {
        self.inner.new_child(ffi::MDB_RDONLY)
            .and_then(|txn| Ok(ReadonlyTransaction::new_with_native(txn)))

    }

    /// Aborts transaction. But readonly transaction could be
    /// reused later by calling `renew`
    pub fn abort(&mut self) {
        self.inner.abort();
    }

    /// Resets read only transaction, handle is kept. Must be followed
    /// by call to `renew`
    pub fn reset(&mut self) {
        self.inner.reset();
    }

    /// Acquires a new reader lock after transaction
    /// `abort` or `reset`
    pub fn renew(&mut self) -> MdbResult<()> {
        self.inner.renew()
    }

    pub fn bind(&self, db_handle: &DbHandle) -> Database {
        Database::new_with_handle(db_handle.handle, &self.inner)
    }
}

/// Helper to determine the property of "less than or equal to" where
/// the "equal to" part is to be specified at runtime.
trait IsLess {
    fn is_less(&self, or_equal: bool) -> bool;
}

impl IsLess for Ordering {
    fn is_less(&self, or_equal: bool) -> bool {
        match (*self, or_equal) {
            (Ordering::Less, _) => true,
            (Ordering::Equal, true) => true,
            _ => false,
        }
    }
}

impl IsLess for MdbResult<Ordering> {
    fn is_less(&self, or_equal: bool) -> bool {
        match *self {
            Ok(ord) => ord.is_less(or_equal),
            Err(_) => false,
        }
    }
}

#[derive(Debug)]
pub struct Cursor<'txn> {
    handle: *mut ffi::MDB_cursor,
    data_val: ffi::MDB_val,
    key_val: ffi::MDB_val,
    txn: &'txn NativeTransaction<'txn>,
    db: ffi::MDB_dbi,
    valid_key: bool,
}


impl<'txn> Cursor<'txn> {
    fn new(txn: &'txn NativeTransaction, db: ffi::MDB_dbi) -> MdbResult<Cursor<'txn>> {
        debug!("Opening cursor in {}", db);
        let mut tmp: *mut ffi::MDB_cursor = std::ptr::null_mut();
        try_mdb!(unsafe { ffi::mdb_cursor_open(txn.handle, db, &mut tmp) });
        Ok(Cursor {
            handle: tmp,
            data_val: unsafe { std::mem::zeroed() },
            key_val: unsafe { std::mem::zeroed() },
            txn: txn,
            db: db,
            valid_key: false,
        })
    }

    fn navigate(&mut self, op: ffi::MDB_cursor_op) -> MdbResult<()> {
        self.valid_key = false;

        let res = unsafe {
            ffi::mdb_cursor_get(self.handle, &mut self.key_val, &mut self.data_val, op)
        };
        match res {
            ffi::MDB_SUCCESS => {
                // MDB_SET is the only cursor operation which doesn't
                // writes back a new value. In this case any access to
                // cursor key value should cause a cursor retrieval
                // to get back pointer to database owned memory instead
                // of value used to set the cursor as it might be
                // already destroyed and there is no need to borrow it
                self.valid_key = op != ffi::MDB_cursor_op::MDB_SET;
                Ok(())
            },
            e => Err(MdbError::new_with_code(e))
        }
    }

    fn move_to<K, V>(&mut self, key: &K, value: Option<&V>, op: ffi::MDB_cursor_op) -> MdbResult<()>
        where K: ToMdbValue, V: ToMdbValue {
        self.key_val = key.to_mdb_value().value;
        self.data_val = match value {
            Some(v) => v.to_mdb_value().value,
            _ => unsafe {std::mem::zeroed() }
        };

        self.navigate(op)
    }

    /// Moves cursor to first entry
    pub fn to_first(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_FIRST)
    }

    /// Moves cursor to last entry
    pub fn to_last(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_LAST)
    }

    /// Moves cursor to first entry for key if it exists
    pub fn to_key<'k, K: ToMdbValue>(&mut self, key: &'k K) -> MdbResult<()> {
        self.move_to(key, None::<&MdbValue<'k>>, ffi::MDB_cursor_op::MDB_SET_KEY)
    }

    /// Moves cursor to first entry for key greater than
    /// or equal to ke
    pub fn to_gte_key<'k, K: ToMdbValue>(&mut self, key: &'k K) -> MdbResult<()> {
        self.move_to(key, None::<&MdbValue<'k>>, ffi::MDB_cursor_op::MDB_SET_RANGE)
    }

    /// Moves cursor to specific item (for example, if cursor
    /// already points to a correct key and you need to delete
    /// a specific item through cursor)
    pub fn to_item<K, V>(&mut self, key: &K, value: & V) -> MdbResult<()> where K: ToMdbValue, V: ToMdbValue {
        self.move_to(key, Some(value), ffi::MDB_cursor_op::MDB_GET_BOTH)
    }

    /// Moves cursor to nearest item.
    pub fn to_gte_item<K, V>(&mut self, key: &K, value: & V) -> MdbResult<()> where K: ToMdbValue, V: ToMdbValue {
        self.move_to(key, Some(value), ffi::MDB_cursor_op::MDB_GET_BOTH_RANGE)
    }

    /// Moves cursor to next key, i.e. skip items
    /// with duplicate keys
    pub fn to_next_key(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_NEXT_NODUP)
    }

    /// Moves cursor to next item with the same key as current
    pub fn to_next_item(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_NEXT_DUP)
    }

    /// Moves cursor to prev entry, i.e. skips items
    /// with duplicate keys
    pub fn to_prev_key(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_PREV_NODUP)
    }

    /// Moves cursor to prev item with the same key as current
    pub fn to_prev_item(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_PREV_DUP)
    }

    /// Moves cursor to first item with the same key as current
    pub fn to_first_item(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_FIRST_DUP)
    }

    /// Moves cursor to last item with the same key as current
    pub fn to_last_item(&mut self) -> MdbResult<()> {
        self.navigate(ffi::MDB_cursor_op::MDB_LAST_DUP)
    }

    /// Retrieves current key/value as tuple
    pub fn get<'a, T: FromMdbValue + 'a, U: FromMdbValue + 'a>(&'a mut self) -> MdbResult<(T, U)> {
        let (k, v) = try!(self.get_plain());

        unsafe {
            Ok((FromMdbValue::from_mdb_value(mem::transmute(&k)),
                FromMdbValue::from_mdb_value(mem::transmute(&v))))
        }
    }

    /// Retrieves current value
    pub fn get_value<'a, V: FromMdbValue + 'a>(&'a mut self) -> MdbResult<V> {
        let (_, v) = try!(self.get_plain());

        unsafe {
            Ok(FromMdbValue::from_mdb_value(mem::transmute(&v)))
        }
    }

    /// Retrieves current key
    pub fn get_key<'a, K: FromMdbValue + 'a>(&'a mut self) -> MdbResult<K> {
        let (k, _) = try!(self.get_plain());

        unsafe {
            Ok(FromMdbValue::from_mdb_value(mem::transmute(&k)))
        }
    }

    /// Compares the cursor's current key with the specified other one.
    #[inline]
    fn cmp_key(&mut self, other: &MdbValue) -> MdbResult<Ordering> {
        let (k, _) = try!(self.get_plain());
        let mut kval = k.value;
        let cmp = unsafe {
            ffi::mdb_cmp(self.txn.handle, self.db, &mut kval, mem::transmute(other))
        };
        Ok(match cmp {
            n if n < 0 => Ordering::Less,
            n if n > 0 => Ordering::Greater,
            _          => Ordering::Equal,
        })
    }

    #[inline]
    fn ensure_key_valid(&mut self) -> MdbResult<()> {
        // If key might be invalid simply perform cursor get to be sure
        // it points to database memory instead of user one
        if !self.valid_key {
            unsafe {
                try_mdb!(ffi::mdb_cursor_get(self.handle, &mut self.key_val,
                                             ptr::null_mut(),
                                             ffi::MDB_cursor_op::MDB_GET_CURRENT));
            }
            self.valid_key = true;
        }
        Ok(())
    }

    #[inline]
    fn get_plain(&mut self) -> MdbResult<(MdbValue<'txn>, MdbValue<'txn>)> {
        try!(self.ensure_key_valid());
        let k = MdbValue {value: self.key_val, marker: ::std::marker::PhantomData};
        let v = MdbValue {value: self.data_val, marker: ::std::marker::PhantomData};

        Ok((k, v))
    }

    #[allow(dead_code)]
    // This one is used for debugging, so it's to OK to leave it for a while
    fn dump_value(&self, prefix: &str) {
        if self.valid_key {
            println!("{}: key {:?}, data {:?}", prefix,
                     self.key_val,
                     self.data_val);
        }
    }

    fn set_value<V: ToMdbValue>(&mut self, value: &V, flags: c_uint) -> MdbResult<()> {
        try!(self.ensure_key_valid());
        self.data_val = value.to_mdb_value().value;
        lift_mdb!(unsafe {ffi::mdb_cursor_put(self.handle, &mut self.key_val, &mut self.data_val, flags)})
    }

    pub fn set<K: ToMdbValue, V: ToMdbValue>(&mut self, key: &K, value: &V, flags: c_uint) -> MdbResult<()> {
        self.key_val = key.to_mdb_value().value;
        self.valid_key = true;
        let res = self.set_value(value, flags);
        self.valid_key = false;
        res
    }

    /// Overwrites value for current item
    /// Note: overwrites max cur_value.len() bytes
    pub fn replace<V: ToMdbValue>(&mut self, value: &V) -> MdbResult<()> {
        let res = self.set_value(value, ffi::MDB_CURRENT);
        self.valid_key = false;
        res
    }

    /// Adds a new item when created with allowed duplicates
    pub fn add_item<V: ToMdbValue>(&mut self, value: &V) -> MdbResult<()> {
        let res = self.set_value(value, 0);
        self.valid_key = false;
        res
    }

    fn del_value(&mut self, flags: c_uint) -> MdbResult<()> {
        lift_mdb!(unsafe { ffi::mdb_cursor_del(self.handle, flags) })
    }

    /// Deletes current key
    pub fn del(&mut self) -> MdbResult<()> {
        self.del_all()
    }

    /// Deletes only current item
    ///
    /// Note that it doesn't check anything so it is caller responsibility
    /// to make sure that correct item is deleted if, for example, caller
    /// wants to delete only items of current key
    pub fn del_item(&mut self) -> MdbResult<()> {
        let res = self.del_value(0);
        self.valid_key = false;
        res
    }

    /// Deletes all items with same key as current
    pub fn del_all(&mut self) -> MdbResult<()> {
        self.del_value(ffi::MDB_NODUPDATA)
    }

    /// Returns count of items with the same key as current
    pub fn item_count(&self) -> MdbResult<size_t> {
        let mut tmp: size_t = 0;
        lift_mdb!(unsafe {ffi::mdb_cursor_count(self.handle, &mut tmp)}, tmp)
    }

    pub fn get_item<'k, K: ToMdbValue>(self, k: &'k K) -> CursorItemAccessor<'txn, 'k, K> {
        CursorItemAccessor {
            cursor: self,
            key: k
        }
    }
}

impl<'txn> Drop for Cursor<'txn> {
    fn drop(&mut self) {
        unsafe { ffi::mdb_cursor_close(self.handle) };
    }
}

#[derive(Debug)]
pub struct CursorItemAccessor<'c, 'k, K: 'k> {
    cursor: Cursor<'c>,
    key: &'k K,
}

impl<'k, 'c: 'k, K: ToMdbValue> CursorItemAccessor<'c, 'k, K> {
    pub fn get<'a, V: FromMdbValue + 'a>(&'a mut self) -> MdbResult<V> {
        try!(self.cursor.to_key(self.key));
        self.cursor.get_value()
    }

    pub fn add<V: ToMdbValue>(&mut self, v: &V) -> MdbResult<()> {
        self.cursor.set(self.key, v, 0)
    }

    pub fn del<V: ToMdbValue>(&mut self, v: &V) -> MdbResult<()> {
        try!(self.cursor.to_item(self.key, v));
        self.cursor.del_item()
    }

    pub fn del_all(&mut self) -> MdbResult<()> {
        try!(self.cursor.to_key(self.key));
        self.cursor.del_all()
    }

    pub fn into_inner(self) -> Cursor<'c> {
        let tmp = self;
        tmp.cursor
    }
}


#[derive(Debug)]
pub struct CursorValue<'cursor> {
    key: MdbValue<'cursor>,
    value: MdbValue<'cursor>,
    marker: ::std::marker::PhantomData<&'cursor ()>,
}

/// CursorValue performs lazy data extraction from iterator
/// avoiding any data conversions and memory copy. Lifetime
/// is limited to iterator lifetime
impl<'cursor> CursorValue<'cursor> {
    pub fn get_key<T: FromMdbValue + 'cursor>(&'cursor self) -> T {
        FromMdbValue::from_mdb_value(&self.key)
    }

    pub fn get_value<T: FromMdbValue + 'cursor>(&'cursor self) -> T {
        FromMdbValue::from_mdb_value(&self.value)
    }

    pub fn get<T: FromMdbValue + 'cursor, U: FromMdbValue + 'cursor>(&'cursor self) -> (T, U) {
        (FromMdbValue::from_mdb_value(&self.key),
         FromMdbValue::from_mdb_value(&self.value))
    }
}

/// Allows the cration of custom cursor iteration behaviours.
pub trait IterateCursor {
    /// Returns true if initialization successful, for example that
    /// the key exists.
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: &mut Cursor<'b>) -> bool;

    /// Returns true if there is still data and iterator is in correct range
    fn move_to_next<'iter, 'cursor: 'iter>(&'iter self, cursor: &'cursor mut Cursor<'cursor>) -> bool;

    /// Returns size hint considering current state of cursor
    fn get_size_hint(&self, _cursor: &Cursor) -> (usize, Option<usize>) {
        (0, None)
    }
}


#[derive(Debug)]
pub struct CursorIterator<'c, I> {
    inner: I,
    has_data: bool,
    cursor: Cursor<'c>,
    marker: ::std::marker::PhantomData<&'c ()>,
}

impl<'c, I: IterateCursor + 'c> CursorIterator<'c, I> {
    fn wrap(cursor: Cursor<'c>, inner: I) -> CursorIterator<'c, I> {
        let mut cursor = cursor;
        let has_data = inner.init_cursor(&mut cursor);
        CursorIterator {
            inner: inner,
            has_data: has_data,
            cursor: cursor,
            marker: ::std::marker::PhantomData,
        }
    }

    #[allow(dead_code)]
    fn unwrap(self) -> Cursor<'c> {
        self.cursor
    }
}

impl<'c, I: IterateCursor + 'c> Iterator for CursorIterator<'c, I> {
    type Item = CursorValue<'c>;

    fn next(&mut self) -> Option<CursorValue<'c>> {
        if !self.has_data {
            None
        } else {
            match self.cursor.get_plain() {
                Err(_) => None,
                Ok((k, v)) => {
                    self.has_data = unsafe { self.inner.move_to_next(mem::transmute(&mut self.cursor)) };
                    Some(CursorValue {
                        key: k,
                        value: v,
                        marker: ::std::marker::PhantomData
                    })
                }
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.inner.get_size_hint(&self.cursor)
    }
}

#[derive(Debug)]
pub struct CursorKeyRangeIter<'a> {
    start_key: MdbValue<'a>,
    end_key: MdbValue<'a>,
    end_inclusive: bool,
    marker: ::std::marker::PhantomData<&'a ()>,
}

impl<'a> CursorKeyRangeIter<'a> {
    pub fn new<K: ToMdbValue+'a>(start_key: &'a K, end_key: &'a K, end_inclusive: bool) -> CursorKeyRangeIter<'a> {
        CursorKeyRangeIter {
            start_key: start_key.to_mdb_value(),
            end_key: end_key.to_mdb_value(),
            end_inclusive: end_inclusive,
            marker: ::std::marker::PhantomData,
        }
    }
}

impl<'iter> IterateCursor for CursorKeyRangeIter<'iter> {
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: & mut Cursor<'b>) -> bool {
        let ok = unsafe {
            cursor.to_gte_key(mem::transmute::<&'a MdbValue<'a>, &'b MdbValue<'b>>(&self.start_key)).is_ok()
        };
        ok && cursor.cmp_key(&self.end_key).is_less(self.end_inclusive)
    }

    fn move_to_next<'i, 'c: 'i>(&'i self, cursor: &'c mut Cursor<'c>) -> bool {
        let moved = cursor.to_next_key().is_ok();
        if !moved {
            false
        } else {
            cursor.cmp_key(&self.end_key).is_less(self.end_inclusive)
        }
    }
}

#[derive(Debug)]
pub struct CursorFromKeyIter<'a> {
    start_key: MdbValue<'a>,
    marker: ::std::marker::PhantomData<&'a ()>,
}


impl<'a> CursorFromKeyIter<'a> {
    pub fn new<K: ToMdbValue+'a>(start_key: &'a K) -> CursorFromKeyIter<'a> {
        CursorFromKeyIter {
            start_key: start_key.to_mdb_value(),
            marker: ::std::marker::PhantomData
        }
    }
}

impl<'iter> IterateCursor for CursorFromKeyIter<'iter> {
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: & mut Cursor<'b>) -> bool {
        unsafe {
            cursor.to_gte_key(mem::transmute::<&'a MdbValue<'a>, &'b MdbValue<'b>>(&self.start_key)).is_ok()
        }
    }

    fn move_to_next<'i, 'c: 'i>(&'i self, cursor: &'c mut Cursor<'c>) -> bool {
        cursor.to_next_key().is_ok()
    }
}


#[derive(Debug)]
pub struct CursorToKeyIter<'a> {
    end_key: MdbValue<'a>,
    marker: ::std::marker::PhantomData<&'a ()>,
}


impl<'a> CursorToKeyIter<'a> {
    pub fn new<K: ToMdbValue+'a>(end_key: &'a K) -> CursorToKeyIter<'a> {
        CursorToKeyIter {
            end_key: end_key.to_mdb_value(),
            marker: ::std::marker::PhantomData,
        }
    }
}

impl<'iter> IterateCursor for CursorToKeyIter<'iter> {
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: & mut Cursor<'b>) -> bool {
        let ok = cursor.to_first().is_ok();
        ok && cursor.cmp_key(&self.end_key).is_less(false)
    }

    fn move_to_next<'i, 'c: 'i>(&'i self, cursor: &'c mut Cursor<'c>) -> bool {
        let moved = cursor.to_next_key().is_ok();
        if !moved {
            false
        } else {
            cursor.cmp_key(&self.end_key).is_less(false)
        }
    }
}

#[allow(missing_copy_implementations)]
#[derive(Debug)]
pub struct CursorIter;


impl<'iter> IterateCursor for CursorIter {
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: & mut Cursor<'b>) -> bool {
        cursor.to_first().is_ok()
    }

    fn move_to_next<'i, 'c: 'i>(&'i self, cursor: &'c mut Cursor<'c>) -> bool {
        cursor.to_next_key().is_ok()
    }
}


#[derive(Debug)]
pub struct CursorItemIter<'a> {
    key: MdbValue<'a>,
    marker: ::std::marker::PhantomData<&'a ()>,
}


impl<'a> CursorItemIter<'a> {
    pub fn new<K: ToMdbValue+'a>(key: &'a K) -> CursorItemIter<'a> {
        CursorItemIter {
            key: key.to_mdb_value(),
            marker: ::std::marker::PhantomData
        }
    }
}

impl<'iter> IterateCursor for CursorItemIter<'iter> {
    fn init_cursor<'a, 'b: 'a>(&'a self, cursor: & mut Cursor<'b>) -> bool {
        unsafe {
            cursor.to_key(mem::transmute::<&MdbValue, &'b MdbValue<'b>>(&self.key)).is_ok()
        }
    }

    fn move_to_next<'i, 'c: 'i>(&'i self, cursor: &'c mut Cursor<'c>) -> bool {
        cursor.to_next_item().is_ok()
    }

    fn get_size_hint(&self, c: &Cursor) -> (usize, Option<usize>) {
        match c.item_count() {
            Err(_) => (0, None),
            Ok(cnt) => (0, Some(cnt as usize))
        }
    }
}


#[derive(Copy, Clone, Debug)]
pub struct MdbValue<'a> {
    value: MDB_val,
    marker: ::std::marker::PhantomData<&'a ()>,
}

impl<'a> MdbValue<'a> {
    #[inline]
    pub unsafe fn new(data: *const c_void, len: usize) -> MdbValue<'a> {
        MdbValue {
            value: MDB_val {
                mv_data: data,
                mv_size: len as size_t
            },
            marker: ::std::marker::PhantomData
        }
    }

    #[inline]
    pub unsafe fn from_raw(mdb_val: *const ffi::MDB_val) -> MdbValue<'a> {
        MdbValue::new((*mdb_val).mv_data, (*mdb_val).mv_size as usize)
    }

    #[inline]
    pub fn new_from_sized<T>(data: &'a T) -> MdbValue<'a> {
        unsafe {
            MdbValue::new(mem::transmute(data), mem::size_of::<T>())
        }
    }

    #[inline]
    pub unsafe fn get_ref(&'a self) -> *const c_void {
        self.value.mv_data
    }

    #[inline]
    pub fn get_size(&self) -> usize {
        self.value.mv_size as usize
    }
}