cloudproof_findex 6.0.2

Cosmian Findex Cloudproof library
Documentation
# -*- coding: utf-8 -*-
import os
import requests
import redis
import unittest

from typing import Set

from cloudproof_findex import (
    AuthorizationToken,
    Findex,
    IndexedValuesAndKeywords,
    Keyword,
    Location,
    Key,
    ProgressResults,
    PythonCallbacks,
)


class TestStructures(unittest.TestCase):
    def test_location(self) -> None:
        # from string
        input_string = 'test location'
        loc_str = Location.from_string(input_string)
        # conversion
        self.assertEqual(str(loc_str), input_string)
        # comparison
        self.assertEqual(loc_str, input_string)
        self.assertNotEqual(loc_str, 'wrong str')
        # hash
        self.assertEqual(hash(loc_str), hash(input_string))

        # from int
        input_int = 2**51 - 1
        loi32 = Location.from_int(input_int)
        self.assertEqual(int(loi32), input_int)
        # comparison
        self.assertEqual(loi32, input_int)
        self.assertNotEqual(loi32, 2**51 - 2)
        # hash not working for int

        # from bytes
        input_bytes = b'test location'
        loc_bytes = Location.from_bytes(input_bytes)
        self.assertEqual(bytes(loc_bytes), input_bytes)
        # comparison
        self.assertEqual(loc_bytes, input_bytes)
        self.assertNotEqual(loc_bytes, b'wrong bytes')
        # hash
        self.assertEqual(hash(loc_bytes), hash(input_bytes))

        # comparison between keywords
        self.assertEqual(loc_str, loc_bytes)
        self.assertNotEqual(loc_str, loi32)

    def test_keyword(self) -> None:
        # from string
        input_string = 'test keyword'
        kw_str = Keyword.from_string(input_string)
        # conversion
        self.assertEqual(str(kw_str), input_string)
        # comparison
        self.assertEqual(kw_str, input_string)
        self.assertNotEqual(kw_str, 'wrong str')
        # hash
        self.assertEqual(hash(kw_str), hash(input_string))

        # from int
        input_int = 2**51 - 1
        kw_int = Keyword.from_int(input_int)
        self.assertEqual(int(kw_int), input_int)
        # comparison
        self.assertEqual(kw_int, input_int)
        self.assertNotEqual(kw_int, 2**51 - 2)
        # hash not working for int

        # from bytes
        input_bytes = b'test keyword'
        kw_bytes = Keyword.from_bytes(input_bytes)
        self.assertEqual(bytes(kw_bytes), input_bytes)
        # comparison
        self.assertEqual(kw_bytes, input_bytes)
        self.assertNotEqual(kw_bytes, b'wrong bytes')
        # hash
        self.assertEqual(hash(kw_bytes), hash(input_bytes))

        # comparison between keywords
        self.assertEqual(kw_str, kw_bytes)
        self.assertNotEqual(kw_str, kw_int)

    def test_keys(self) -> None:
        msk = Key.random()
        self.assertIsInstance(msk, Key)

        saved_bytes = msk.to_bytes()
        reloaded_msk = Key.from_bytes(saved_bytes)
        self.assertEqual(saved_bytes, reloaded_msk.to_bytes())

        with self.assertRaises(ValueError):
            Key.from_bytes(b'wrong size')


def define_custom_db_interface(is_with_test: bool = False):
    table: dict = {}

    def fetch(uids):
        res = {}
        for uid in uids:
            if uid in table:
                res[uid] = table[uid]
        return res

    def upsert(old_values: dict, new_values: dict):
        res = {}
        for uid, new_value in new_values.items():
            current_value = table.get(uid)
            old_value = old_values.get(uid)
            if old_value == current_value:
                table[uid] = new_value
            elif not current_value:
                raise ValueError('The current value needs to be defined as long as the old value is defined ')
            else:
                res[uid] = current_value
        return res

    def insert(items):
        for uid, value in items.items():
            if uid in table:
                raise ValueError('collision in insert operation on UID: ' + uid)
            table[uid] = value

    def delete(uids):
        for uid in uids:
            table.pop(uid)

    def dump_tokens():
        return table.keys()

    if is_with_test:
        k1 = 'my first key'
        k2 = 'my second key'
        k3 = 'my third key'
        v1 = [1, 2, 3]
        v2 = [4, 5, 6]
        v3 = [7, 8, 9]

        # Test values can be upserted.
        res = upsert({}, {k1: v1})
        assert not res
        assert v1 == fetch([k1])[k1]

        res = upsert({k1: v1}, {k1: v2})
        assert not res
        assert v2 == fetch([k1])[k1]

        res = upsert({k1: v1}, {k1: v3})
        assert res == {k1: v2}
        assert v2 == fetch([k1])[k1]

        assert {k1} == dump_tokens()

        insert({k3: v1})
        assert v1 == fetch([k3])[k3]
        assert not fetch([k2])

        try:
            insert({k1: v2})
            raise ValueError('collision on key: ' + k1)
        except:
            pass

        # clear test values
        table = {}

    in_memory_db_interface = PythonCallbacks.new()
    in_memory_db_interface.set_fetch(fetch)
    in_memory_db_interface.set_upsert(upsert)
    in_memory_db_interface.set_insert(insert)
    in_memory_db_interface.set_delete(delete)
    in_memory_db_interface.set_dump_tokens(dump_tokens)

    return in_memory_db_interface


class TestFindex(unittest.TestCase):
    def setUp(self) -> None:
        # Create structures needed by Findex
        self.findex_key = Key.random()
        self.label = "My label."

        self.db = {
            1: ['Martin', 'Sheperd'],
            2: ['Martial', 'Wilkins'],
            3: ['John', 'Sheperd'],
        }

        # Parameters used by the REST interface
        rest_server_url = 'http://localhost:8080'
        res = requests.post(
            rest_server_url + '/indexes',
            headers={'Content-Type': 'application/json'},
            json={'name': 'Test'},
            timeout=5,
        )
        response = res.json()
        token = AuthorizationToken.new(
            index_id=response['public_id'],
            findex_key=Key.random(),
            fetch_entries_key=Key.from_bytes(response['fetch_entries_key']),
            fetch_chains_key=Key.from_bytes(response['fetch_chains_key']),
            upsert_entries_key=Key.from_bytes(response['upsert_entries_key']),
            insert_chains_key=Key.from_bytes(response['insert_chains_key']),
        )

        in_memory_db_interface = define_custom_db_interface()

        sqlite_path = '/tmp/cloudproof_findex.sqlite'
        redis_host = 'localhost'
        redis_port = 6379
        redis_url = f'redis://{redis_host}:{redis_port}'

        if os.path.exists(sqlite_path):
            os.remove(sqlite_path)

        r = redis.Redis(host=redis_host, port=redis_port, db=0)
        print(redis_url)
        r.flushdb()

        self.findex_interfaces = {
            'sqlite': Findex.new_with_sqlite_interface(
                self.findex_key,
                self.label,
                sqlite_path,
            ),
            'redis': Findex.new_with_redis_interface(
                self.findex_key,
                self.label,
                redis_url,
            ),
            'rest': Findex.new_with_rest_interface(self.label,
                                                   str(token),
                                                   rest_server_url),
            'custom': Findex.new_with_custom_interface(
                self.findex_key, self.label, in_memory_db_interface, in_memory_db_interface
            ),
        }

    def test_upsert(self) -> None:
        indexed_values_and_keywords: IndexedValuesAndKeywords = {
            Location.from_int(k): v for k, v in self.db.items()
        }

        for interface, instance in self.findex_interfaces.items():
            print('Test upserting on {} interface.', interface)
            res = instance.add(indexed_values_and_keywords)
            # 5 keywords returned since "Sheperd" is duplicated
            self.assertEqual(len(res), 5)

            res = instance.add({Location.from_int(4): ['John', 'Snow']})
            # 1 keyword returned since "John" is already indexed
            self.assertEqual(res, set(['Snow']))

    def test_upsert_search(self) -> None:
        indexed_values_and_keywords: IndexedValuesAndKeywords = {
            Location.from_int(k): v for k, v in self.db.items()
        }

        for interface, instance in self.findex_interfaces.items():
            print('Test upserting and search on {} interface.', interface)
            instance.add(indexed_values_and_keywords)

            res = instance.search([Keyword.from_bytes(b'Martial')])
            self.assertEqual(len(res), 1)
            self.assertEqual(len(res[Keyword.from_string('Martial')]), 1)
            self.assertEqual(int(res['Martial'][0]), 2)

            res = instance.search(['Sheperd', 'Wilkins'])
            self.assertEqual(len(res['Sheperd']), 2)
            self.assertEqual(len(res['Wilkins']), 1)

    def test_graph_upsert_search(self) -> None:
        indexed_values_and_keywords: IndexedValuesAndKeywords = {
            Location.from_int(k): v for k, v in self.db.items()
        }

        for interface, instance in self.findex_interfaces.items():
            print(f'Test graph upserting and search on {interface} interface.')
            instance.add(indexed_values_and_keywords)

            # Adding custom keywords graph
            graph: IndexedValuesAndKeywords = {
                Keyword.from_string('Mart'): ['Mar'],
                Keyword.from_string('Marti'): ['Mart'],
                Keyword.from_string('Martin'): ['Marti'],
                Keyword.from_string('Martia'): ['Marti'],
                Keyword.from_string('Martial'): ['Martia'],
            }
            instance.add(graph)

            res = instance.search(['Mar'])
            # 2 names starting with Mar
            self.assertEqual(len(res['Mar']), 2)

            # Test progress callback
            def false_progress_callback(res: ProgressResults) -> bool:
                self.assertEqual(len(res['Mar']), 1)
                return True

            res = instance.search(
                ['Mar'],
                interrupt=false_progress_callback,
            )
            # no locations returned since the progress_callback stopped the recursion
            self.assertEqual(len(res['Mar']), 0)

            def early_stop_progress_callback(res: ProgressResults) -> bool:
                if 'Martin' in res:
                    return True
                return False

            res = instance.search(
                ['Mar'],
                interrupt=early_stop_progress_callback,
            )
            # only one location found after early stopping
            self.assertEqual(len(res['Mar']), 1)

    def test_compact(self) -> None:
        indexed_values_and_keywords: IndexedValuesAndKeywords = {
            Location.from_int(k): v for k, v in self.db.items()
        }

        interfaces = [
            (interface, instance)
            for interface, instance in self.findex_interfaces.items()
            if interface == 'sqlite'
        ]

        for interface, instance in interfaces:
            print(f'Test compacting and search on {interface} interface.')

            instance.add(indexed_values_and_keywords)

            # removing 2nd db line
            new_label = "My renewed label"

            filtered_locations = { Location.from_int(2) }

            def filter_obsolete_data(dataset: Set[Location]):
                res = set()
                for data in dataset:
                    if data not in filtered_locations:
                        res.add(data)
                return res

            instance.compact(self.findex_key, new_label, 1, filter_obsolete_data)

            # now new_label can perform search
            res = instance.search(['Sheperd'])
            self.assertEqual(len(res['Sheperd']), 2)

            # and the keywords corresponding to the 2nd line have been removed
            res = instance.search(['Martial', 'Wilkins'])
            self.assertEqual(len(res['Martial']), 0)
            self.assertEqual(len(res['Wilkins']), 0)


if __name__ == '__main__':
    define_custom_db_interface(True)
    unittest.main()